1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, BufferId, ChannelId, ChannelRole, ChannelsForUser, Database, MessageId, ProjectId,
7 RoomId, ServerId, User, UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AnyTypedEnvelope, ChannelEdge, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage, UpdateChannelBufferCollaborators,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66use util::channel::RELEASE_CHANNEL_NAME;
67
68pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
69pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74lazy_static! {
75 static ref METRIC_CONNECTIONS: IntGauge =
76 register_int_gauge!("connections", "number of connections").unwrap();
77 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
78 "shared_projects",
79 "number of open projects with one or more guests"
80 )
81 .unwrap();
82}
83
84type MessageHandler =
85 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
86
87struct Response<R> {
88 peer: Arc<Peer>,
89 receipt: Receipt<R>,
90 responded: Arc<AtomicBool>,
91}
92
93impl<R: RequestMessage> Response<R> {
94 fn send(self, payload: R::Response) -> Result<()> {
95 self.responded.store(true, SeqCst);
96 self.peer.respond(self.receipt, payload)?;
97 Ok(())
98 }
99}
100
101#[derive(Clone)]
102struct Session {
103 user_id: UserId,
104 connection_id: ConnectionId,
105 db: Arc<tokio::sync::Mutex<DbHandle>>,
106 peer: Arc<Peer>,
107 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
108 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
109 executor: Executor,
110}
111
112impl Session {
113 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
114 #[cfg(test)]
115 tokio::task::yield_now().await;
116 let guard = self.db.lock().await;
117 #[cfg(test)]
118 tokio::task::yield_now().await;
119 guard
120 }
121
122 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
123 #[cfg(test)]
124 tokio::task::yield_now().await;
125 let guard = self.connection_pool.lock();
126 ConnectionPoolGuard {
127 guard,
128 _not_send: PhantomData,
129 }
130 }
131}
132
133impl fmt::Debug for Session {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("Session")
136 .field("user_id", &self.user_id)
137 .field("connection_id", &self.connection_id)
138 .finish()
139 }
140}
141
142struct DbHandle(Arc<Database>);
143
144impl Deref for DbHandle {
145 type Target = Database;
146
147 fn deref(&self) -> &Self::Target {
148 self.0.as_ref()
149 }
150}
151
152pub struct Server {
153 id: parking_lot::Mutex<ServerId>,
154 peer: Arc<Peer>,
155 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
156 app_state: Arc<AppState>,
157 executor: Executor,
158 handlers: HashMap<TypeId, MessageHandler>,
159 teardown: watch::Sender<()>,
160}
161
162pub(crate) struct ConnectionPoolGuard<'a> {
163 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
164 _not_send: PhantomData<Rc<()>>,
165}
166
167#[derive(Serialize)]
168pub struct ServerSnapshot<'a> {
169 peer: &'a Peer,
170 #[serde(serialize_with = "serialize_deref")]
171 connection_pool: ConnectionPoolGuard<'a>,
172}
173
174pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
175where
176 S: Serializer,
177 T: Deref<Target = U>,
178 U: Serialize,
179{
180 Serialize::serialize(value.deref(), serializer)
181}
182
183impl Server {
184 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
185 let mut server = Self {
186 id: parking_lot::Mutex::new(id),
187 peer: Peer::new(id.0 as u32),
188 app_state,
189 executor,
190 connection_pool: Default::default(),
191 handlers: Default::default(),
192 teardown: watch::channel(()).0,
193 };
194
195 server
196 .add_request_handler(ping)
197 .add_request_handler(create_room)
198 .add_request_handler(join_room)
199 .add_request_handler(rejoin_room)
200 .add_request_handler(leave_room)
201 .add_request_handler(call)
202 .add_request_handler(cancel_call)
203 .add_message_handler(decline_call)
204 .add_request_handler(update_participant_location)
205 .add_request_handler(share_project)
206 .add_message_handler(unshare_project)
207 .add_request_handler(join_project)
208 .add_message_handler(leave_project)
209 .add_request_handler(update_project)
210 .add_request_handler(update_worktree)
211 .add_message_handler(start_language_server)
212 .add_message_handler(update_language_server)
213 .add_message_handler(update_diagnostic_summary)
214 .add_message_handler(update_worktree_settings)
215 .add_message_handler(refresh_inlay_hints)
216 .add_request_handler(forward_project_request::<proto::GetHover>)
217 .add_request_handler(forward_project_request::<proto::GetDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
219 .add_request_handler(forward_project_request::<proto::GetReferences>)
220 .add_request_handler(forward_project_request::<proto::SearchProject>)
221 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
222 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
225 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
226 .add_request_handler(forward_project_request::<proto::GetCompletions>)
227 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
228 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
229 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
230 .add_request_handler(forward_project_request::<proto::PrepareRename>)
231 .add_request_handler(forward_project_request::<proto::PerformRename>)
232 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
233 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
234 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
235 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
240 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
241 .add_request_handler(forward_project_request::<proto::InlayHints>)
242 .add_message_handler(create_buffer_for_peer)
243 .add_request_handler(update_buffer)
244 .add_message_handler(update_buffer_file)
245 .add_message_handler(buffer_reloaded)
246 .add_message_handler(buffer_saved)
247 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
248 .add_request_handler(get_users)
249 .add_request_handler(fuzzy_search_users)
250 .add_request_handler(request_contact)
251 .add_request_handler(remove_contact)
252 .add_request_handler(respond_to_contact_request)
253 .add_request_handler(create_channel)
254 .add_request_handler(delete_channel)
255 .add_request_handler(invite_channel_member)
256 .add_request_handler(remove_channel_member)
257 .add_request_handler(set_channel_member_admin)
258 .add_request_handler(rename_channel)
259 .add_request_handler(join_channel_buffer)
260 .add_request_handler(leave_channel_buffer)
261 .add_message_handler(update_channel_buffer)
262 .add_request_handler(rejoin_channel_buffers)
263 .add_request_handler(get_channel_members)
264 .add_request_handler(respond_to_channel_invite)
265 .add_request_handler(join_channel)
266 .add_request_handler(join_channel_chat)
267 .add_message_handler(leave_channel_chat)
268 .add_request_handler(send_channel_message)
269 .add_request_handler(remove_channel_message)
270 .add_request_handler(get_channel_messages)
271 .add_request_handler(link_channel)
272 .add_request_handler(unlink_channel)
273 .add_request_handler(move_channel)
274 .add_request_handler(follow)
275 .add_message_handler(unfollow)
276 .add_message_handler(update_followers)
277 .add_message_handler(update_diff_base)
278 .add_request_handler(get_private_user_info)
279 .add_message_handler(acknowledge_channel_message)
280 .add_message_handler(acknowledge_buffer_version);
281
282 Arc::new(server)
283 }
284
285 pub async fn start(&self) -> Result<()> {
286 let server_id = *self.id.lock();
287 let app_state = self.app_state.clone();
288 let peer = self.peer.clone();
289 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
290 let pool = self.connection_pool.clone();
291 let live_kit_client = self.app_state.live_kit_client.clone();
292
293 let span = info_span!("start server");
294 self.executor.spawn_detached(
295 async move {
296 tracing::info!("waiting for cleanup timeout");
297 timeout.await;
298 tracing::info!("cleanup timeout expired, retrieving stale rooms");
299 if let Some((room_ids, channel_ids)) = app_state
300 .db
301 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
302 .await
303 .trace_err()
304 {
305 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
306 tracing::info!(
307 stale_channel_buffer_count = channel_ids.len(),
308 "retrieved stale channel buffers"
309 );
310
311 for channel_id in channel_ids {
312 if let Some(refreshed_channel_buffer) = app_state
313 .db
314 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
315 .await
316 .trace_err()
317 {
318 for connection_id in refreshed_channel_buffer.connection_ids {
319 peer.send(
320 connection_id,
321 proto::UpdateChannelBufferCollaborators {
322 channel_id: channel_id.to_proto(),
323 collaborators: refreshed_channel_buffer
324 .collaborators
325 .clone(),
326 },
327 )
328 .trace_err();
329 }
330 }
331 }
332
333 for room_id in room_ids {
334 let mut contacts_to_update = HashSet::default();
335 let mut canceled_calls_to_user_ids = Vec::new();
336 let mut live_kit_room = String::new();
337 let mut delete_live_kit_room = false;
338
339 if let Some(mut refreshed_room) = app_state
340 .db
341 .clear_stale_room_participants(room_id, server_id)
342 .await
343 .trace_err()
344 {
345 tracing::info!(
346 room_id = room_id.0,
347 new_participant_count = refreshed_room.room.participants.len(),
348 "refreshed room"
349 );
350 room_updated(&refreshed_room.room, &peer);
351 if let Some(channel_id) = refreshed_room.channel_id {
352 channel_updated(
353 channel_id,
354 &refreshed_room.room,
355 &refreshed_room.channel_members,
356 &peer,
357 &*pool.lock(),
358 );
359 }
360 contacts_to_update
361 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
362 contacts_to_update
363 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
364 canceled_calls_to_user_ids =
365 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
366 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
367 delete_live_kit_room = refreshed_room.room.participants.is_empty();
368 }
369
370 {
371 let pool = pool.lock();
372 for canceled_user_id in canceled_calls_to_user_ids {
373 for connection_id in pool.user_connection_ids(canceled_user_id) {
374 peer.send(
375 connection_id,
376 proto::CallCanceled {
377 room_id: room_id.to_proto(),
378 },
379 )
380 .trace_err();
381 }
382 }
383 }
384
385 for user_id in contacts_to_update {
386 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
387 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
388 if let Some((busy, contacts)) = busy.zip(contacts) {
389 let pool = pool.lock();
390 let updated_contact = contact_for_user(user_id, false, busy, &pool);
391 for contact in contacts {
392 if let db::Contact::Accepted {
393 user_id: contact_user_id,
394 ..
395 } = contact
396 {
397 for contact_conn_id in
398 pool.user_connection_ids(contact_user_id)
399 {
400 peer.send(
401 contact_conn_id,
402 proto::UpdateContacts {
403 contacts: vec![updated_contact.clone()],
404 remove_contacts: Default::default(),
405 incoming_requests: Default::default(),
406 remove_incoming_requests: Default::default(),
407 outgoing_requests: Default::default(),
408 remove_outgoing_requests: Default::default(),
409 },
410 )
411 .trace_err();
412 }
413 }
414 }
415 }
416 }
417
418 if let Some(live_kit) = live_kit_client.as_ref() {
419 if delete_live_kit_room {
420 live_kit.delete_room(live_kit_room).await.trace_err();
421 }
422 }
423 }
424 }
425
426 app_state
427 .db
428 .delete_stale_servers(&app_state.config.zed_environment, server_id)
429 .await
430 .trace_err();
431 }
432 .instrument(span),
433 );
434 Ok(())
435 }
436
437 pub fn teardown(&self) {
438 self.peer.teardown();
439 self.connection_pool.lock().reset();
440 let _ = self.teardown.send(());
441 }
442
443 #[cfg(test)]
444 pub fn reset(&self, id: ServerId) {
445 self.teardown();
446 *self.id.lock() = id;
447 self.peer.reset(id.0 as u32);
448 }
449
450 #[cfg(test)]
451 pub fn id(&self) -> ServerId {
452 *self.id.lock()
453 }
454
455 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
456 where
457 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
458 Fut: 'static + Send + Future<Output = Result<()>>,
459 M: EnvelopedMessage,
460 {
461 let prev_handler = self.handlers.insert(
462 TypeId::of::<M>(),
463 Box::new(move |envelope, session| {
464 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
465 let span = info_span!(
466 "handle message",
467 payload_type = envelope.payload_type_name()
468 );
469 span.in_scope(|| {
470 tracing::info!(
471 payload_type = envelope.payload_type_name(),
472 "message received"
473 );
474 });
475 let start_time = Instant::now();
476 let future = (handler)(*envelope, session);
477 async move {
478 let result = future.await;
479 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
480 match result {
481 Err(error) => {
482 tracing::error!(%error, ?duration_ms, "error handling message")
483 }
484 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
485 }
486 }
487 .instrument(span)
488 .boxed()
489 }),
490 );
491 if prev_handler.is_some() {
492 panic!("registered a handler for the same message twice");
493 }
494 self
495 }
496
497 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
498 where
499 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
500 Fut: 'static + Send + Future<Output = Result<()>>,
501 M: EnvelopedMessage,
502 {
503 self.add_handler(move |envelope, session| handler(envelope.payload, session));
504 self
505 }
506
507 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
508 where
509 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
510 Fut: Send + Future<Output = Result<()>>,
511 M: RequestMessage,
512 {
513 let handler = Arc::new(handler);
514 self.add_handler(move |envelope, session| {
515 let receipt = envelope.receipt();
516 let handler = handler.clone();
517 async move {
518 let peer = session.peer.clone();
519 let responded = Arc::new(AtomicBool::default());
520 let response = Response {
521 peer: peer.clone(),
522 responded: responded.clone(),
523 receipt,
524 };
525 match (handler)(envelope.payload, response, session).await {
526 Ok(()) => {
527 if responded.load(std::sync::atomic::Ordering::SeqCst) {
528 Ok(())
529 } else {
530 Err(anyhow!("handler did not send a response"))?
531 }
532 }
533 Err(error) => {
534 peer.respond_with_error(
535 receipt,
536 proto::Error {
537 message: error.to_string(),
538 },
539 )?;
540 Err(error)
541 }
542 }
543 }
544 })
545 }
546
547 pub fn handle_connection(
548 self: &Arc<Self>,
549 connection: Connection,
550 address: String,
551 user: User,
552 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
553 executor: Executor,
554 ) -> impl Future<Output = Result<()>> {
555 let this = self.clone();
556 let user_id = user.id;
557 let login = user.github_login;
558 let span = info_span!("handle connection", %user_id, %login, %address);
559 let mut teardown = self.teardown.subscribe();
560 async move {
561 let (connection_id, handle_io, mut incoming_rx) = this
562 .peer
563 .add_connection(connection, {
564 let executor = executor.clone();
565 move |duration| executor.sleep(duration)
566 });
567
568 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
569 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
570 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
571
572 if let Some(send_connection_id) = send_connection_id.take() {
573 let _ = send_connection_id.send(connection_id);
574 }
575
576 if !user.connected_once {
577 this.peer.send(connection_id, proto::ShowContacts {})?;
578 this.app_state.db.set_user_connected_once(user_id, true).await?;
579 }
580
581 let (contacts, channels_for_user, channel_invites) = future::try_join3(
582 this.app_state.db.get_contacts(user_id),
583 this.app_state.db.get_channels_for_user(user_id),
584 this.app_state.db.get_channel_invites_for_user(user_id)
585 ).await?;
586
587 {
588 let mut pool = this.connection_pool.lock();
589 pool.add_connection(connection_id, user_id, user.admin);
590 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
591 this.peer.send(connection_id, build_initial_channels_update(
592 channels_for_user,
593 channel_invites
594 ))?;
595 }
596
597 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
598 this.peer.send(connection_id, incoming_call)?;
599 }
600
601 let session = Session {
602 user_id,
603 connection_id,
604 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
605 peer: this.peer.clone(),
606 connection_pool: this.connection_pool.clone(),
607 live_kit_client: this.app_state.live_kit_client.clone(),
608 executor: executor.clone(),
609 };
610 update_user_contacts(user_id, &session).await?;
611
612 let handle_io = handle_io.fuse();
613 futures::pin_mut!(handle_io);
614
615 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
616 // This prevents deadlocks when e.g., client A performs a request to client B and
617 // client B performs a request to client A. If both clients stop processing further
618 // messages until their respective request completes, they won't have a chance to
619 // respond to the other client's request and cause a deadlock.
620 //
621 // This arrangement ensures we will attempt to process earlier messages first, but fall
622 // back to processing messages arrived later in the spirit of making progress.
623 let mut foreground_message_handlers = FuturesUnordered::new();
624 let concurrent_handlers = Arc::new(Semaphore::new(256));
625 loop {
626 let next_message = async {
627 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
628 let message = incoming_rx.next().await;
629 (permit, message)
630 }.fuse();
631 futures::pin_mut!(next_message);
632 futures::select_biased! {
633 _ = teardown.changed().fuse() => return Ok(()),
634 result = handle_io => {
635 if let Err(error) = result {
636 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
637 }
638 break;
639 }
640 _ = foreground_message_handlers.next() => {}
641 next_message = next_message => {
642 let (permit, message) = next_message;
643 if let Some(message) = message {
644 let type_name = message.payload_type_name();
645 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
646 let span_enter = span.enter();
647 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
648 let is_background = message.is_background();
649 let handle_message = (handler)(message, session.clone());
650 drop(span_enter);
651
652 let handle_message = async move {
653 handle_message.await;
654 drop(permit);
655 }.instrument(span);
656 if is_background {
657 executor.spawn_detached(handle_message);
658 } else {
659 foreground_message_handlers.push(handle_message);
660 }
661 } else {
662 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
663 }
664 } else {
665 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
666 break;
667 }
668 }
669 }
670 }
671
672 drop(foreground_message_handlers);
673 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
674 if let Err(error) = connection_lost(session, teardown, executor).await {
675 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
676 }
677
678 Ok(())
679 }.instrument(span)
680 }
681
682 pub async fn invite_code_redeemed(
683 self: &Arc<Self>,
684 inviter_id: UserId,
685 invitee_id: UserId,
686 ) -> Result<()> {
687 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
688 if let Some(code) = &user.invite_code {
689 let pool = self.connection_pool.lock();
690 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
691 for connection_id in pool.user_connection_ids(inviter_id) {
692 self.peer.send(
693 connection_id,
694 proto::UpdateContacts {
695 contacts: vec![invitee_contact.clone()],
696 ..Default::default()
697 },
698 )?;
699 self.peer.send(
700 connection_id,
701 proto::UpdateInviteInfo {
702 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
703 count: user.invite_count as u32,
704 },
705 )?;
706 }
707 }
708 }
709 Ok(())
710 }
711
712 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
713 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
714 if let Some(invite_code) = &user.invite_code {
715 let pool = self.connection_pool.lock();
716 for connection_id in pool.user_connection_ids(user_id) {
717 self.peer.send(
718 connection_id,
719 proto::UpdateInviteInfo {
720 url: format!(
721 "{}{}",
722 self.app_state.config.invite_link_prefix, invite_code
723 ),
724 count: user.invite_count as u32,
725 },
726 )?;
727 }
728 }
729 }
730 Ok(())
731 }
732
733 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
734 ServerSnapshot {
735 connection_pool: ConnectionPoolGuard {
736 guard: self.connection_pool.lock(),
737 _not_send: PhantomData,
738 },
739 peer: &self.peer,
740 }
741 }
742}
743
744impl<'a> Deref for ConnectionPoolGuard<'a> {
745 type Target = ConnectionPool;
746
747 fn deref(&self) -> &Self::Target {
748 &*self.guard
749 }
750}
751
752impl<'a> DerefMut for ConnectionPoolGuard<'a> {
753 fn deref_mut(&mut self) -> &mut Self::Target {
754 &mut *self.guard
755 }
756}
757
758impl<'a> Drop for ConnectionPoolGuard<'a> {
759 fn drop(&mut self) {
760 #[cfg(test)]
761 self.check_invariants();
762 }
763}
764
765fn broadcast<F>(
766 sender_id: Option<ConnectionId>,
767 receiver_ids: impl IntoIterator<Item = ConnectionId>,
768 mut f: F,
769) where
770 F: FnMut(ConnectionId) -> anyhow::Result<()>,
771{
772 for receiver_id in receiver_ids {
773 if Some(receiver_id) != sender_id {
774 if let Err(error) = f(receiver_id) {
775 tracing::error!("failed to send to {:?} {}", receiver_id, error);
776 }
777 }
778 }
779}
780
781lazy_static! {
782 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
783}
784
785pub struct ProtocolVersion(u32);
786
787impl Header for ProtocolVersion {
788 fn name() -> &'static HeaderName {
789 &ZED_PROTOCOL_VERSION
790 }
791
792 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
793 where
794 Self: Sized,
795 I: Iterator<Item = &'i axum::http::HeaderValue>,
796 {
797 let version = values
798 .next()
799 .ok_or_else(axum::headers::Error::invalid)?
800 .to_str()
801 .map_err(|_| axum::headers::Error::invalid())?
802 .parse()
803 .map_err(|_| axum::headers::Error::invalid())?;
804 Ok(Self(version))
805 }
806
807 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
808 values.extend([self.0.to_string().parse().unwrap()]);
809 }
810}
811
812pub fn routes(server: Arc<Server>) -> Router<Body> {
813 Router::new()
814 .route("/rpc", get(handle_websocket_request))
815 .layer(
816 ServiceBuilder::new()
817 .layer(Extension(server.app_state.clone()))
818 .layer(middleware::from_fn(auth::validate_header)),
819 )
820 .route("/metrics", get(handle_metrics))
821 .layer(Extension(server))
822}
823
824pub async fn handle_websocket_request(
825 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
826 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
827 Extension(server): Extension<Arc<Server>>,
828 Extension(user): Extension<User>,
829 ws: WebSocketUpgrade,
830) -> axum::response::Response {
831 if protocol_version != rpc::PROTOCOL_VERSION {
832 return (
833 StatusCode::UPGRADE_REQUIRED,
834 "client must be upgraded".to_string(),
835 )
836 .into_response();
837 }
838 let socket_address = socket_address.to_string();
839 ws.on_upgrade(move |socket| {
840 use util::ResultExt;
841 let socket = socket
842 .map_ok(to_tungstenite_message)
843 .err_into()
844 .with(|message| async move { Ok(to_axum_message(message)) });
845 let connection = Connection::new(Box::pin(socket));
846 async move {
847 server
848 .handle_connection(connection, socket_address, user, None, Executor::Production)
849 .await
850 .log_err();
851 }
852 })
853}
854
855pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
856 let connections = server
857 .connection_pool
858 .lock()
859 .connections()
860 .filter(|connection| !connection.admin)
861 .count();
862
863 METRIC_CONNECTIONS.set(connections as _);
864
865 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
866 METRIC_SHARED_PROJECTS.set(shared_projects as _);
867
868 let encoder = prometheus::TextEncoder::new();
869 let metric_families = prometheus::gather();
870 let encoded_metrics = encoder
871 .encode_to_string(&metric_families)
872 .map_err(|err| anyhow!("{}", err))?;
873 Ok(encoded_metrics)
874}
875
876#[instrument(err, skip(executor))]
877async fn connection_lost(
878 session: Session,
879 mut teardown: watch::Receiver<()>,
880 executor: Executor,
881) -> Result<()> {
882 session.peer.disconnect(session.connection_id);
883 session
884 .connection_pool()
885 .await
886 .remove_connection(session.connection_id)?;
887
888 session
889 .db()
890 .await
891 .connection_lost(session.connection_id)
892 .await
893 .trace_err();
894
895 futures::select_biased! {
896 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
897 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
898 leave_room_for_session(&session).await.trace_err();
899 leave_channel_buffers_for_session(&session)
900 .await
901 .trace_err();
902
903 if !session
904 .connection_pool()
905 .await
906 .is_user_online(session.user_id)
907 {
908 let db = session.db().await;
909 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
910 room_updated(&room, &session.peer);
911 }
912 }
913
914 update_user_contacts(session.user_id, &session).await?;
915 }
916 _ = teardown.changed().fuse() => {}
917 }
918
919 Ok(())
920}
921
922async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
923 response.send(proto::Ack {})?;
924 Ok(())
925}
926
927async fn create_room(
928 _request: proto::CreateRoom,
929 response: Response<proto::CreateRoom>,
930 session: Session,
931) -> Result<()> {
932 let live_kit_room = nanoid::nanoid!(30);
933
934 let live_kit_connection_info = {
935 let live_kit_room = live_kit_room.clone();
936 let live_kit = session.live_kit_client.as_ref();
937
938 util::async_iife!({
939 let live_kit = live_kit?;
940
941 let token = live_kit
942 .room_token(&live_kit_room, &session.user_id.to_string())
943 .trace_err()?;
944
945 Some(proto::LiveKitConnectionInfo {
946 server_url: live_kit.url().into(),
947 token,
948 })
949 })
950 }
951 .await;
952
953 let room = session
954 .db()
955 .await
956 .create_room(
957 session.user_id,
958 session.connection_id,
959 &live_kit_room,
960 RELEASE_CHANNEL_NAME.as_str(),
961 )
962 .await?;
963
964 response.send(proto::CreateRoomResponse {
965 room: Some(room.clone()),
966 live_kit_connection_info,
967 })?;
968
969 update_user_contacts(session.user_id, &session).await?;
970 Ok(())
971}
972
973async fn join_room(
974 request: proto::JoinRoom,
975 response: Response<proto::JoinRoom>,
976 session: Session,
977) -> Result<()> {
978 let room_id = RoomId::from_proto(request.id);
979 let joined_room = {
980 let room = session
981 .db()
982 .await
983 .join_room(
984 room_id,
985 session.user_id,
986 session.connection_id,
987 RELEASE_CHANNEL_NAME.as_str(),
988 )
989 .await?;
990 room_updated(&room.room, &session.peer);
991 room.into_inner()
992 };
993
994 if let Some(channel_id) = joined_room.channel_id {
995 channel_updated(
996 channel_id,
997 &joined_room.room,
998 &joined_room.channel_members,
999 &session.peer,
1000 &*session.connection_pool().await,
1001 )
1002 }
1003
1004 for connection_id in session
1005 .connection_pool()
1006 .await
1007 .user_connection_ids(session.user_id)
1008 {
1009 session
1010 .peer
1011 .send(
1012 connection_id,
1013 proto::CallCanceled {
1014 room_id: room_id.to_proto(),
1015 },
1016 )
1017 .trace_err();
1018 }
1019
1020 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1021 if let Some(token) = live_kit
1022 .room_token(
1023 &joined_room.room.live_kit_room,
1024 &session.user_id.to_string(),
1025 )
1026 .trace_err()
1027 {
1028 Some(proto::LiveKitConnectionInfo {
1029 server_url: live_kit.url().into(),
1030 token,
1031 })
1032 } else {
1033 None
1034 }
1035 } else {
1036 None
1037 };
1038
1039 response.send(proto::JoinRoomResponse {
1040 room: Some(joined_room.room),
1041 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1042 live_kit_connection_info,
1043 })?;
1044
1045 update_user_contacts(session.user_id, &session).await?;
1046 Ok(())
1047}
1048
1049async fn rejoin_room(
1050 request: proto::RejoinRoom,
1051 response: Response<proto::RejoinRoom>,
1052 session: Session,
1053) -> Result<()> {
1054 let room;
1055 let channel_id;
1056 let channel_members;
1057 {
1058 let mut rejoined_room = session
1059 .db()
1060 .await
1061 .rejoin_room(request, session.user_id, session.connection_id)
1062 .await?;
1063
1064 response.send(proto::RejoinRoomResponse {
1065 room: Some(rejoined_room.room.clone()),
1066 reshared_projects: rejoined_room
1067 .reshared_projects
1068 .iter()
1069 .map(|project| proto::ResharedProject {
1070 id: project.id.to_proto(),
1071 collaborators: project
1072 .collaborators
1073 .iter()
1074 .map(|collaborator| collaborator.to_proto())
1075 .collect(),
1076 })
1077 .collect(),
1078 rejoined_projects: rejoined_room
1079 .rejoined_projects
1080 .iter()
1081 .map(|rejoined_project| proto::RejoinedProject {
1082 id: rejoined_project.id.to_proto(),
1083 worktrees: rejoined_project
1084 .worktrees
1085 .iter()
1086 .map(|worktree| proto::WorktreeMetadata {
1087 id: worktree.id,
1088 root_name: worktree.root_name.clone(),
1089 visible: worktree.visible,
1090 abs_path: worktree.abs_path.clone(),
1091 })
1092 .collect(),
1093 collaborators: rejoined_project
1094 .collaborators
1095 .iter()
1096 .map(|collaborator| collaborator.to_proto())
1097 .collect(),
1098 language_servers: rejoined_project.language_servers.clone(),
1099 })
1100 .collect(),
1101 })?;
1102 room_updated(&rejoined_room.room, &session.peer);
1103
1104 for project in &rejoined_room.reshared_projects {
1105 for collaborator in &project.collaborators {
1106 session
1107 .peer
1108 .send(
1109 collaborator.connection_id,
1110 proto::UpdateProjectCollaborator {
1111 project_id: project.id.to_proto(),
1112 old_peer_id: Some(project.old_connection_id.into()),
1113 new_peer_id: Some(session.connection_id.into()),
1114 },
1115 )
1116 .trace_err();
1117 }
1118
1119 broadcast(
1120 Some(session.connection_id),
1121 project
1122 .collaborators
1123 .iter()
1124 .map(|collaborator| collaborator.connection_id),
1125 |connection_id| {
1126 session.peer.forward_send(
1127 session.connection_id,
1128 connection_id,
1129 proto::UpdateProject {
1130 project_id: project.id.to_proto(),
1131 worktrees: project.worktrees.clone(),
1132 },
1133 )
1134 },
1135 );
1136 }
1137
1138 for project in &rejoined_room.rejoined_projects {
1139 for collaborator in &project.collaborators {
1140 session
1141 .peer
1142 .send(
1143 collaborator.connection_id,
1144 proto::UpdateProjectCollaborator {
1145 project_id: project.id.to_proto(),
1146 old_peer_id: Some(project.old_connection_id.into()),
1147 new_peer_id: Some(session.connection_id.into()),
1148 },
1149 )
1150 .trace_err();
1151 }
1152 }
1153
1154 for project in &mut rejoined_room.rejoined_projects {
1155 for worktree in mem::take(&mut project.worktrees) {
1156 #[cfg(any(test, feature = "test-support"))]
1157 const MAX_CHUNK_SIZE: usize = 2;
1158 #[cfg(not(any(test, feature = "test-support")))]
1159 const MAX_CHUNK_SIZE: usize = 256;
1160
1161 // Stream this worktree's entries.
1162 let message = proto::UpdateWorktree {
1163 project_id: project.id.to_proto(),
1164 worktree_id: worktree.id,
1165 abs_path: worktree.abs_path.clone(),
1166 root_name: worktree.root_name,
1167 updated_entries: worktree.updated_entries,
1168 removed_entries: worktree.removed_entries,
1169 scan_id: worktree.scan_id,
1170 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1171 updated_repositories: worktree.updated_repositories,
1172 removed_repositories: worktree.removed_repositories,
1173 };
1174 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1175 session.peer.send(session.connection_id, update.clone())?;
1176 }
1177
1178 // Stream this worktree's diagnostics.
1179 for summary in worktree.diagnostic_summaries {
1180 session.peer.send(
1181 session.connection_id,
1182 proto::UpdateDiagnosticSummary {
1183 project_id: project.id.to_proto(),
1184 worktree_id: worktree.id,
1185 summary: Some(summary),
1186 },
1187 )?;
1188 }
1189
1190 for settings_file in worktree.settings_files {
1191 session.peer.send(
1192 session.connection_id,
1193 proto::UpdateWorktreeSettings {
1194 project_id: project.id.to_proto(),
1195 worktree_id: worktree.id,
1196 path: settings_file.path,
1197 content: Some(settings_file.content),
1198 },
1199 )?;
1200 }
1201 }
1202
1203 for language_server in &project.language_servers {
1204 session.peer.send(
1205 session.connection_id,
1206 proto::UpdateLanguageServer {
1207 project_id: project.id.to_proto(),
1208 language_server_id: language_server.id,
1209 variant: Some(
1210 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1211 proto::LspDiskBasedDiagnosticsUpdated {},
1212 ),
1213 ),
1214 },
1215 )?;
1216 }
1217 }
1218
1219 let rejoined_room = rejoined_room.into_inner();
1220
1221 room = rejoined_room.room;
1222 channel_id = rejoined_room.channel_id;
1223 channel_members = rejoined_room.channel_members;
1224 }
1225
1226 if let Some(channel_id) = channel_id {
1227 channel_updated(
1228 channel_id,
1229 &room,
1230 &channel_members,
1231 &session.peer,
1232 &*session.connection_pool().await,
1233 );
1234 }
1235
1236 update_user_contacts(session.user_id, &session).await?;
1237 Ok(())
1238}
1239
1240async fn leave_room(
1241 _: proto::LeaveRoom,
1242 response: Response<proto::LeaveRoom>,
1243 session: Session,
1244) -> Result<()> {
1245 leave_room_for_session(&session).await?;
1246 response.send(proto::Ack {})?;
1247 Ok(())
1248}
1249
1250async fn call(
1251 request: proto::Call,
1252 response: Response<proto::Call>,
1253 session: Session,
1254) -> Result<()> {
1255 let room_id = RoomId::from_proto(request.room_id);
1256 let calling_user_id = session.user_id;
1257 let calling_connection_id = session.connection_id;
1258 let called_user_id = UserId::from_proto(request.called_user_id);
1259 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1260 if !session
1261 .db()
1262 .await
1263 .has_contact(calling_user_id, called_user_id)
1264 .await?
1265 {
1266 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1267 }
1268
1269 let incoming_call = {
1270 let (room, incoming_call) = &mut *session
1271 .db()
1272 .await
1273 .call(
1274 room_id,
1275 calling_user_id,
1276 calling_connection_id,
1277 called_user_id,
1278 initial_project_id,
1279 )
1280 .await?;
1281 room_updated(&room, &session.peer);
1282 mem::take(incoming_call)
1283 };
1284 update_user_contacts(called_user_id, &session).await?;
1285
1286 let mut calls = session
1287 .connection_pool()
1288 .await
1289 .user_connection_ids(called_user_id)
1290 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1291 .collect::<FuturesUnordered<_>>();
1292
1293 while let Some(call_response) = calls.next().await {
1294 match call_response.as_ref() {
1295 Ok(_) => {
1296 response.send(proto::Ack {})?;
1297 return Ok(());
1298 }
1299 Err(_) => {
1300 call_response.trace_err();
1301 }
1302 }
1303 }
1304
1305 {
1306 let room = session
1307 .db()
1308 .await
1309 .call_failed(room_id, called_user_id)
1310 .await?;
1311 room_updated(&room, &session.peer);
1312 }
1313 update_user_contacts(called_user_id, &session).await?;
1314
1315 Err(anyhow!("failed to ring user"))?
1316}
1317
1318async fn cancel_call(
1319 request: proto::CancelCall,
1320 response: Response<proto::CancelCall>,
1321 session: Session,
1322) -> Result<()> {
1323 let called_user_id = UserId::from_proto(request.called_user_id);
1324 let room_id = RoomId::from_proto(request.room_id);
1325 {
1326 let room = session
1327 .db()
1328 .await
1329 .cancel_call(room_id, session.connection_id, called_user_id)
1330 .await?;
1331 room_updated(&room, &session.peer);
1332 }
1333
1334 for connection_id in session
1335 .connection_pool()
1336 .await
1337 .user_connection_ids(called_user_id)
1338 {
1339 session
1340 .peer
1341 .send(
1342 connection_id,
1343 proto::CallCanceled {
1344 room_id: room_id.to_proto(),
1345 },
1346 )
1347 .trace_err();
1348 }
1349 response.send(proto::Ack {})?;
1350
1351 update_user_contacts(called_user_id, &session).await?;
1352 Ok(())
1353}
1354
1355async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1356 let room_id = RoomId::from_proto(message.room_id);
1357 {
1358 let room = session
1359 .db()
1360 .await
1361 .decline_call(Some(room_id), session.user_id)
1362 .await?
1363 .ok_or_else(|| anyhow!("failed to decline call"))?;
1364 room_updated(&room, &session.peer);
1365 }
1366
1367 for connection_id in session
1368 .connection_pool()
1369 .await
1370 .user_connection_ids(session.user_id)
1371 {
1372 session
1373 .peer
1374 .send(
1375 connection_id,
1376 proto::CallCanceled {
1377 room_id: room_id.to_proto(),
1378 },
1379 )
1380 .trace_err();
1381 }
1382 update_user_contacts(session.user_id, &session).await?;
1383 Ok(())
1384}
1385
1386async fn update_participant_location(
1387 request: proto::UpdateParticipantLocation,
1388 response: Response<proto::UpdateParticipantLocation>,
1389 session: Session,
1390) -> Result<()> {
1391 let room_id = RoomId::from_proto(request.room_id);
1392 let location = request
1393 .location
1394 .ok_or_else(|| anyhow!("invalid location"))?;
1395
1396 let db = session.db().await;
1397 let room = db
1398 .update_room_participant_location(room_id, session.connection_id, location)
1399 .await?;
1400
1401 room_updated(&room, &session.peer);
1402 response.send(proto::Ack {})?;
1403 Ok(())
1404}
1405
1406async fn share_project(
1407 request: proto::ShareProject,
1408 response: Response<proto::ShareProject>,
1409 session: Session,
1410) -> Result<()> {
1411 let (project_id, room) = &*session
1412 .db()
1413 .await
1414 .share_project(
1415 RoomId::from_proto(request.room_id),
1416 session.connection_id,
1417 &request.worktrees,
1418 )
1419 .await?;
1420 response.send(proto::ShareProjectResponse {
1421 project_id: project_id.to_proto(),
1422 })?;
1423 room_updated(&room, &session.peer);
1424
1425 Ok(())
1426}
1427
1428async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1429 let project_id = ProjectId::from_proto(message.project_id);
1430
1431 let (room, guest_connection_ids) = &*session
1432 .db()
1433 .await
1434 .unshare_project(project_id, session.connection_id)
1435 .await?;
1436
1437 broadcast(
1438 Some(session.connection_id),
1439 guest_connection_ids.iter().copied(),
1440 |conn_id| session.peer.send(conn_id, message.clone()),
1441 );
1442 room_updated(&room, &session.peer);
1443
1444 Ok(())
1445}
1446
1447async fn join_project(
1448 request: proto::JoinProject,
1449 response: Response<proto::JoinProject>,
1450 session: Session,
1451) -> Result<()> {
1452 let project_id = ProjectId::from_proto(request.project_id);
1453 let guest_user_id = session.user_id;
1454
1455 tracing::info!(%project_id, "join project");
1456
1457 let (project, replica_id) = &mut *session
1458 .db()
1459 .await
1460 .join_project(project_id, session.connection_id)
1461 .await?;
1462
1463 let collaborators = project
1464 .collaborators
1465 .iter()
1466 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1467 .map(|collaborator| collaborator.to_proto())
1468 .collect::<Vec<_>>();
1469
1470 let worktrees = project
1471 .worktrees
1472 .iter()
1473 .map(|(id, worktree)| proto::WorktreeMetadata {
1474 id: *id,
1475 root_name: worktree.root_name.clone(),
1476 visible: worktree.visible,
1477 abs_path: worktree.abs_path.clone(),
1478 })
1479 .collect::<Vec<_>>();
1480
1481 for collaborator in &collaborators {
1482 session
1483 .peer
1484 .send(
1485 collaborator.peer_id.unwrap().into(),
1486 proto::AddProjectCollaborator {
1487 project_id: project_id.to_proto(),
1488 collaborator: Some(proto::Collaborator {
1489 peer_id: Some(session.connection_id.into()),
1490 replica_id: replica_id.0 as u32,
1491 user_id: guest_user_id.to_proto(),
1492 }),
1493 },
1494 )
1495 .trace_err();
1496 }
1497
1498 // First, we send the metadata associated with each worktree.
1499 response.send(proto::JoinProjectResponse {
1500 worktrees: worktrees.clone(),
1501 replica_id: replica_id.0 as u32,
1502 collaborators: collaborators.clone(),
1503 language_servers: project.language_servers.clone(),
1504 })?;
1505
1506 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1507 #[cfg(any(test, feature = "test-support"))]
1508 const MAX_CHUNK_SIZE: usize = 2;
1509 #[cfg(not(any(test, feature = "test-support")))]
1510 const MAX_CHUNK_SIZE: usize = 256;
1511
1512 // Stream this worktree's entries.
1513 let message = proto::UpdateWorktree {
1514 project_id: project_id.to_proto(),
1515 worktree_id,
1516 abs_path: worktree.abs_path.clone(),
1517 root_name: worktree.root_name,
1518 updated_entries: worktree.entries,
1519 removed_entries: Default::default(),
1520 scan_id: worktree.scan_id,
1521 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1522 updated_repositories: worktree.repository_entries.into_values().collect(),
1523 removed_repositories: Default::default(),
1524 };
1525 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1526 session.peer.send(session.connection_id, update.clone())?;
1527 }
1528
1529 // Stream this worktree's diagnostics.
1530 for summary in worktree.diagnostic_summaries {
1531 session.peer.send(
1532 session.connection_id,
1533 proto::UpdateDiagnosticSummary {
1534 project_id: project_id.to_proto(),
1535 worktree_id: worktree.id,
1536 summary: Some(summary),
1537 },
1538 )?;
1539 }
1540
1541 for settings_file in worktree.settings_files {
1542 session.peer.send(
1543 session.connection_id,
1544 proto::UpdateWorktreeSettings {
1545 project_id: project_id.to_proto(),
1546 worktree_id: worktree.id,
1547 path: settings_file.path,
1548 content: Some(settings_file.content),
1549 },
1550 )?;
1551 }
1552 }
1553
1554 for language_server in &project.language_servers {
1555 session.peer.send(
1556 session.connection_id,
1557 proto::UpdateLanguageServer {
1558 project_id: project_id.to_proto(),
1559 language_server_id: language_server.id,
1560 variant: Some(
1561 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1562 proto::LspDiskBasedDiagnosticsUpdated {},
1563 ),
1564 ),
1565 },
1566 )?;
1567 }
1568
1569 Ok(())
1570}
1571
1572async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1573 let sender_id = session.connection_id;
1574 let project_id = ProjectId::from_proto(request.project_id);
1575
1576 let (room, project) = &*session
1577 .db()
1578 .await
1579 .leave_project(project_id, sender_id)
1580 .await?;
1581 tracing::info!(
1582 %project_id,
1583 host_user_id = %project.host_user_id,
1584 host_connection_id = %project.host_connection_id,
1585 "leave project"
1586 );
1587
1588 project_left(&project, &session);
1589 room_updated(&room, &session.peer);
1590
1591 Ok(())
1592}
1593
1594async fn update_project(
1595 request: proto::UpdateProject,
1596 response: Response<proto::UpdateProject>,
1597 session: Session,
1598) -> Result<()> {
1599 let project_id = ProjectId::from_proto(request.project_id);
1600 let (room, guest_connection_ids) = &*session
1601 .db()
1602 .await
1603 .update_project(project_id, session.connection_id, &request.worktrees)
1604 .await?;
1605 broadcast(
1606 Some(session.connection_id),
1607 guest_connection_ids.iter().copied(),
1608 |connection_id| {
1609 session
1610 .peer
1611 .forward_send(session.connection_id, connection_id, request.clone())
1612 },
1613 );
1614 room_updated(&room, &session.peer);
1615 response.send(proto::Ack {})?;
1616
1617 Ok(())
1618}
1619
1620async fn update_worktree(
1621 request: proto::UpdateWorktree,
1622 response: Response<proto::UpdateWorktree>,
1623 session: Session,
1624) -> Result<()> {
1625 let guest_connection_ids = session
1626 .db()
1627 .await
1628 .update_worktree(&request, session.connection_id)
1629 .await?;
1630
1631 broadcast(
1632 Some(session.connection_id),
1633 guest_connection_ids.iter().copied(),
1634 |connection_id| {
1635 session
1636 .peer
1637 .forward_send(session.connection_id, connection_id, request.clone())
1638 },
1639 );
1640 response.send(proto::Ack {})?;
1641 Ok(())
1642}
1643
1644async fn update_diagnostic_summary(
1645 message: proto::UpdateDiagnosticSummary,
1646 session: Session,
1647) -> Result<()> {
1648 let guest_connection_ids = session
1649 .db()
1650 .await
1651 .update_diagnostic_summary(&message, session.connection_id)
1652 .await?;
1653
1654 broadcast(
1655 Some(session.connection_id),
1656 guest_connection_ids.iter().copied(),
1657 |connection_id| {
1658 session
1659 .peer
1660 .forward_send(session.connection_id, connection_id, message.clone())
1661 },
1662 );
1663
1664 Ok(())
1665}
1666
1667async fn update_worktree_settings(
1668 message: proto::UpdateWorktreeSettings,
1669 session: Session,
1670) -> Result<()> {
1671 let guest_connection_ids = session
1672 .db()
1673 .await
1674 .update_worktree_settings(&message, session.connection_id)
1675 .await?;
1676
1677 broadcast(
1678 Some(session.connection_id),
1679 guest_connection_ids.iter().copied(),
1680 |connection_id| {
1681 session
1682 .peer
1683 .forward_send(session.connection_id, connection_id, message.clone())
1684 },
1685 );
1686
1687 Ok(())
1688}
1689
1690async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1691 broadcast_project_message(request.project_id, request, session).await
1692}
1693
1694async fn start_language_server(
1695 request: proto::StartLanguageServer,
1696 session: Session,
1697) -> Result<()> {
1698 let guest_connection_ids = session
1699 .db()
1700 .await
1701 .start_language_server(&request, session.connection_id)
1702 .await?;
1703
1704 broadcast(
1705 Some(session.connection_id),
1706 guest_connection_ids.iter().copied(),
1707 |connection_id| {
1708 session
1709 .peer
1710 .forward_send(session.connection_id, connection_id, request.clone())
1711 },
1712 );
1713 Ok(())
1714}
1715
1716async fn update_language_server(
1717 request: proto::UpdateLanguageServer,
1718 session: Session,
1719) -> Result<()> {
1720 session.executor.record_backtrace();
1721 let project_id = ProjectId::from_proto(request.project_id);
1722 let project_connection_ids = session
1723 .db()
1724 .await
1725 .project_connection_ids(project_id, session.connection_id)
1726 .await?;
1727 broadcast(
1728 Some(session.connection_id),
1729 project_connection_ids.iter().copied(),
1730 |connection_id| {
1731 session
1732 .peer
1733 .forward_send(session.connection_id, connection_id, request.clone())
1734 },
1735 );
1736 Ok(())
1737}
1738
1739async fn forward_project_request<T>(
1740 request: T,
1741 response: Response<T>,
1742 session: Session,
1743) -> Result<()>
1744where
1745 T: EntityMessage + RequestMessage,
1746{
1747 session.executor.record_backtrace();
1748 let project_id = ProjectId::from_proto(request.remote_entity_id());
1749 let host_connection_id = {
1750 let collaborators = session
1751 .db()
1752 .await
1753 .project_collaborators(project_id, session.connection_id)
1754 .await?;
1755 collaborators
1756 .iter()
1757 .find(|collaborator| collaborator.is_host)
1758 .ok_or_else(|| anyhow!("host not found"))?
1759 .connection_id
1760 };
1761
1762 let payload = session
1763 .peer
1764 .forward_request(session.connection_id, host_connection_id, request)
1765 .await?;
1766
1767 response.send(payload)?;
1768 Ok(())
1769}
1770
1771async fn create_buffer_for_peer(
1772 request: proto::CreateBufferForPeer,
1773 session: Session,
1774) -> Result<()> {
1775 session.executor.record_backtrace();
1776 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1777 session
1778 .peer
1779 .forward_send(session.connection_id, peer_id.into(), request)?;
1780 Ok(())
1781}
1782
1783async fn update_buffer(
1784 request: proto::UpdateBuffer,
1785 response: Response<proto::UpdateBuffer>,
1786 session: Session,
1787) -> Result<()> {
1788 session.executor.record_backtrace();
1789 let project_id = ProjectId::from_proto(request.project_id);
1790 let mut guest_connection_ids;
1791 let mut host_connection_id = None;
1792 {
1793 let collaborators = session
1794 .db()
1795 .await
1796 .project_collaborators(project_id, session.connection_id)
1797 .await?;
1798 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1799 for collaborator in collaborators.iter() {
1800 if collaborator.is_host {
1801 host_connection_id = Some(collaborator.connection_id);
1802 } else {
1803 guest_connection_ids.push(collaborator.connection_id);
1804 }
1805 }
1806 }
1807 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1808
1809 session.executor.record_backtrace();
1810 broadcast(
1811 Some(session.connection_id),
1812 guest_connection_ids,
1813 |connection_id| {
1814 session
1815 .peer
1816 .forward_send(session.connection_id, connection_id, request.clone())
1817 },
1818 );
1819 if host_connection_id != session.connection_id {
1820 session
1821 .peer
1822 .forward_request(session.connection_id, host_connection_id, request.clone())
1823 .await?;
1824 }
1825
1826 response.send(proto::Ack {})?;
1827 Ok(())
1828}
1829
1830async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1831 let project_id = ProjectId::from_proto(request.project_id);
1832 let project_connection_ids = session
1833 .db()
1834 .await
1835 .project_connection_ids(project_id, session.connection_id)
1836 .await?;
1837
1838 broadcast(
1839 Some(session.connection_id),
1840 project_connection_ids.iter().copied(),
1841 |connection_id| {
1842 session
1843 .peer
1844 .forward_send(session.connection_id, connection_id, request.clone())
1845 },
1846 );
1847 Ok(())
1848}
1849
1850async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1851 let project_id = ProjectId::from_proto(request.project_id);
1852 let project_connection_ids = session
1853 .db()
1854 .await
1855 .project_connection_ids(project_id, session.connection_id)
1856 .await?;
1857 broadcast(
1858 Some(session.connection_id),
1859 project_connection_ids.iter().copied(),
1860 |connection_id| {
1861 session
1862 .peer
1863 .forward_send(session.connection_id, connection_id, request.clone())
1864 },
1865 );
1866 Ok(())
1867}
1868
1869async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1870 broadcast_project_message(request.project_id, request, session).await
1871}
1872
1873async fn broadcast_project_message<T: EnvelopedMessage>(
1874 project_id: u64,
1875 request: T,
1876 session: Session,
1877) -> Result<()> {
1878 let project_id = ProjectId::from_proto(project_id);
1879 let project_connection_ids = session
1880 .db()
1881 .await
1882 .project_connection_ids(project_id, session.connection_id)
1883 .await?;
1884 broadcast(
1885 Some(session.connection_id),
1886 project_connection_ids.iter().copied(),
1887 |connection_id| {
1888 session
1889 .peer
1890 .forward_send(session.connection_id, connection_id, request.clone())
1891 },
1892 );
1893 Ok(())
1894}
1895
1896async fn follow(
1897 request: proto::Follow,
1898 response: Response<proto::Follow>,
1899 session: Session,
1900) -> Result<()> {
1901 let room_id = RoomId::from_proto(request.room_id);
1902 let project_id = request.project_id.map(ProjectId::from_proto);
1903 let leader_id = request
1904 .leader_id
1905 .ok_or_else(|| anyhow!("invalid leader id"))?
1906 .into();
1907 let follower_id = session.connection_id;
1908
1909 session
1910 .db()
1911 .await
1912 .check_room_participants(room_id, leader_id, session.connection_id)
1913 .await?;
1914
1915 let response_payload = session
1916 .peer
1917 .forward_request(session.connection_id, leader_id, request)
1918 .await?;
1919 response.send(response_payload)?;
1920
1921 if let Some(project_id) = project_id {
1922 let room = session
1923 .db()
1924 .await
1925 .follow(room_id, project_id, leader_id, follower_id)
1926 .await?;
1927 room_updated(&room, &session.peer);
1928 }
1929
1930 Ok(())
1931}
1932
1933async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1934 let room_id = RoomId::from_proto(request.room_id);
1935 let project_id = request.project_id.map(ProjectId::from_proto);
1936 let leader_id = request
1937 .leader_id
1938 .ok_or_else(|| anyhow!("invalid leader id"))?
1939 .into();
1940 let follower_id = session.connection_id;
1941
1942 session
1943 .db()
1944 .await
1945 .check_room_participants(room_id, leader_id, session.connection_id)
1946 .await?;
1947
1948 session
1949 .peer
1950 .forward_send(session.connection_id, leader_id, request)?;
1951
1952 if let Some(project_id) = project_id {
1953 let room = session
1954 .db()
1955 .await
1956 .unfollow(room_id, project_id, leader_id, follower_id)
1957 .await?;
1958 room_updated(&room, &session.peer);
1959 }
1960
1961 Ok(())
1962}
1963
1964async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1965 let room_id = RoomId::from_proto(request.room_id);
1966 let database = session.db.lock().await;
1967
1968 let connection_ids = if let Some(project_id) = request.project_id {
1969 let project_id = ProjectId::from_proto(project_id);
1970 database
1971 .project_connection_ids(project_id, session.connection_id)
1972 .await?
1973 } else {
1974 database
1975 .room_connection_ids(room_id, session.connection_id)
1976 .await?
1977 };
1978
1979 // For now, don't send view update messages back to that view's current leader.
1980 let connection_id_to_omit = request.variant.as_ref().and_then(|variant| match variant {
1981 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1982 _ => None,
1983 });
1984
1985 for follower_peer_id in request.follower_ids.iter().copied() {
1986 let follower_connection_id = follower_peer_id.into();
1987 if Some(follower_peer_id) != connection_id_to_omit
1988 && connection_ids.contains(&follower_connection_id)
1989 {
1990 session.peer.forward_send(
1991 session.connection_id,
1992 follower_connection_id,
1993 request.clone(),
1994 )?;
1995 }
1996 }
1997 Ok(())
1998}
1999
2000async fn get_users(
2001 request: proto::GetUsers,
2002 response: Response<proto::GetUsers>,
2003 session: Session,
2004) -> Result<()> {
2005 let user_ids = request
2006 .user_ids
2007 .into_iter()
2008 .map(UserId::from_proto)
2009 .collect();
2010 let users = session
2011 .db()
2012 .await
2013 .get_users_by_ids(user_ids)
2014 .await?
2015 .into_iter()
2016 .map(|user| proto::User {
2017 id: user.id.to_proto(),
2018 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2019 github_login: user.github_login,
2020 })
2021 .collect();
2022 response.send(proto::UsersResponse { users })?;
2023 Ok(())
2024}
2025
2026async fn fuzzy_search_users(
2027 request: proto::FuzzySearchUsers,
2028 response: Response<proto::FuzzySearchUsers>,
2029 session: Session,
2030) -> Result<()> {
2031 let query = request.query;
2032 let users = match query.len() {
2033 0 => vec![],
2034 1 | 2 => session
2035 .db()
2036 .await
2037 .get_user_by_github_login(&query)
2038 .await?
2039 .into_iter()
2040 .collect(),
2041 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2042 };
2043 let users = users
2044 .into_iter()
2045 .filter(|user| user.id != session.user_id)
2046 .map(|user| proto::User {
2047 id: user.id.to_proto(),
2048 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2049 github_login: user.github_login,
2050 })
2051 .collect();
2052 response.send(proto::UsersResponse { users })?;
2053 Ok(())
2054}
2055
2056async fn request_contact(
2057 request: proto::RequestContact,
2058 response: Response<proto::RequestContact>,
2059 session: Session,
2060) -> Result<()> {
2061 let requester_id = session.user_id;
2062 let responder_id = UserId::from_proto(request.responder_id);
2063 if requester_id == responder_id {
2064 return Err(anyhow!("cannot add yourself as a contact"))?;
2065 }
2066
2067 session
2068 .db()
2069 .await
2070 .send_contact_request(requester_id, responder_id)
2071 .await?;
2072
2073 // Update outgoing contact requests of requester
2074 let mut update = proto::UpdateContacts::default();
2075 update.outgoing_requests.push(responder_id.to_proto());
2076 for connection_id in session
2077 .connection_pool()
2078 .await
2079 .user_connection_ids(requester_id)
2080 {
2081 session.peer.send(connection_id, update.clone())?;
2082 }
2083
2084 // Update incoming contact requests of responder
2085 let mut update = proto::UpdateContacts::default();
2086 update
2087 .incoming_requests
2088 .push(proto::IncomingContactRequest {
2089 requester_id: requester_id.to_proto(),
2090 should_notify: true,
2091 });
2092 for connection_id in session
2093 .connection_pool()
2094 .await
2095 .user_connection_ids(responder_id)
2096 {
2097 session.peer.send(connection_id, update.clone())?;
2098 }
2099
2100 response.send(proto::Ack {})?;
2101 Ok(())
2102}
2103
2104async fn respond_to_contact_request(
2105 request: proto::RespondToContactRequest,
2106 response: Response<proto::RespondToContactRequest>,
2107 session: Session,
2108) -> Result<()> {
2109 let responder_id = session.user_id;
2110 let requester_id = UserId::from_proto(request.requester_id);
2111 let db = session.db().await;
2112 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2113 db.dismiss_contact_notification(responder_id, requester_id)
2114 .await?;
2115 } else {
2116 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2117
2118 db.respond_to_contact_request(responder_id, requester_id, accept)
2119 .await?;
2120 let requester_busy = db.is_user_busy(requester_id).await?;
2121 let responder_busy = db.is_user_busy(responder_id).await?;
2122
2123 let pool = session.connection_pool().await;
2124 // Update responder with new contact
2125 let mut update = proto::UpdateContacts::default();
2126 if accept {
2127 update
2128 .contacts
2129 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2130 }
2131 update
2132 .remove_incoming_requests
2133 .push(requester_id.to_proto());
2134 for connection_id in pool.user_connection_ids(responder_id) {
2135 session.peer.send(connection_id, update.clone())?;
2136 }
2137
2138 // Update requester with new contact
2139 let mut update = proto::UpdateContacts::default();
2140 if accept {
2141 update
2142 .contacts
2143 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2144 }
2145 update
2146 .remove_outgoing_requests
2147 .push(responder_id.to_proto());
2148 for connection_id in pool.user_connection_ids(requester_id) {
2149 session.peer.send(connection_id, update.clone())?;
2150 }
2151 }
2152
2153 response.send(proto::Ack {})?;
2154 Ok(())
2155}
2156
2157async fn remove_contact(
2158 request: proto::RemoveContact,
2159 response: Response<proto::RemoveContact>,
2160 session: Session,
2161) -> Result<()> {
2162 let requester_id = session.user_id;
2163 let responder_id = UserId::from_proto(request.user_id);
2164 let db = session.db().await;
2165 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2166
2167 let pool = session.connection_pool().await;
2168 // Update outgoing contact requests of requester
2169 let mut update = proto::UpdateContacts::default();
2170 if contact_accepted {
2171 update.remove_contacts.push(responder_id.to_proto());
2172 } else {
2173 update
2174 .remove_outgoing_requests
2175 .push(responder_id.to_proto());
2176 }
2177 for connection_id in pool.user_connection_ids(requester_id) {
2178 session.peer.send(connection_id, update.clone())?;
2179 }
2180
2181 // Update incoming contact requests of responder
2182 let mut update = proto::UpdateContacts::default();
2183 if contact_accepted {
2184 update.remove_contacts.push(requester_id.to_proto());
2185 } else {
2186 update
2187 .remove_incoming_requests
2188 .push(requester_id.to_proto());
2189 }
2190 for connection_id in pool.user_connection_ids(responder_id) {
2191 session.peer.send(connection_id, update.clone())?;
2192 }
2193
2194 response.send(proto::Ack {})?;
2195 Ok(())
2196}
2197
2198async fn create_channel(
2199 request: proto::CreateChannel,
2200 response: Response<proto::CreateChannel>,
2201 session: Session,
2202) -> Result<()> {
2203 let db = session.db().await;
2204
2205 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2206 let id = db
2207 .create_channel(&request.name, parent_id, session.user_id)
2208 .await?;
2209
2210 let channel = proto::Channel {
2211 id: id.to_proto(),
2212 name: request.name,
2213 };
2214
2215 response.send(proto::CreateChannelResponse {
2216 channel: Some(channel.clone()),
2217 parent_id: request.parent_id,
2218 })?;
2219
2220 let Some(parent_id) = parent_id else {
2221 return Ok(());
2222 };
2223
2224 let update = proto::UpdateChannels {
2225 channels: vec![channel],
2226 insert_edge: vec![ChannelEdge {
2227 parent_id: parent_id.to_proto(),
2228 channel_id: id.to_proto(),
2229 }],
2230 ..Default::default()
2231 };
2232
2233 let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2234
2235 let connection_pool = session.connection_pool().await;
2236 for user_id in user_ids_to_notify {
2237 for connection_id in connection_pool.user_connection_ids(user_id) {
2238 if user_id == session.user_id {
2239 continue;
2240 }
2241 session.peer.send(connection_id, update.clone())?;
2242 }
2243 }
2244
2245 Ok(())
2246}
2247
2248async fn delete_channel(
2249 request: proto::DeleteChannel,
2250 response: Response<proto::DeleteChannel>,
2251 session: Session,
2252) -> Result<()> {
2253 let db = session.db().await;
2254
2255 let channel_id = request.channel_id;
2256 let (removed_channels, member_ids) = db
2257 .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2258 .await?;
2259 response.send(proto::Ack {})?;
2260
2261 // Notify members of removed channels
2262 let mut update = proto::UpdateChannels::default();
2263 update
2264 .delete_channels
2265 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2266
2267 let connection_pool = session.connection_pool().await;
2268 for member_id in member_ids {
2269 for connection_id in connection_pool.user_connection_ids(member_id) {
2270 session.peer.send(connection_id, update.clone())?;
2271 }
2272 }
2273
2274 Ok(())
2275}
2276
2277async fn invite_channel_member(
2278 request: proto::InviteChannelMember,
2279 response: Response<proto::InviteChannelMember>,
2280 session: Session,
2281) -> Result<()> {
2282 let db = session.db().await;
2283 let channel_id = ChannelId::from_proto(request.channel_id);
2284 let invitee_id = UserId::from_proto(request.user_id);
2285 let role = if request.admin {
2286 ChannelRole::Admin
2287 } else {
2288 ChannelRole::Member
2289 };
2290 db.invite_channel_member(channel_id, invitee_id, session.user_id, role)
2291 .await?;
2292
2293 let (channel, _) = db
2294 .get_channel(channel_id, session.user_id)
2295 .await?
2296 .ok_or_else(|| anyhow!("channel not found"))?;
2297
2298 let mut update = proto::UpdateChannels::default();
2299 update.channel_invitations.push(proto::Channel {
2300 id: channel.id.to_proto(),
2301 name: channel.name,
2302 });
2303 for connection_id in session
2304 .connection_pool()
2305 .await
2306 .user_connection_ids(invitee_id)
2307 {
2308 session.peer.send(connection_id, update.clone())?;
2309 }
2310
2311 response.send(proto::Ack {})?;
2312 Ok(())
2313}
2314
2315async fn remove_channel_member(
2316 request: proto::RemoveChannelMember,
2317 response: Response<proto::RemoveChannelMember>,
2318 session: Session,
2319) -> Result<()> {
2320 let db = session.db().await;
2321 let channel_id = ChannelId::from_proto(request.channel_id);
2322 let member_id = UserId::from_proto(request.user_id);
2323
2324 db.remove_channel_member(channel_id, member_id, session.user_id)
2325 .await?;
2326
2327 let mut update = proto::UpdateChannels::default();
2328 update.delete_channels.push(channel_id.to_proto());
2329
2330 for connection_id in session
2331 .connection_pool()
2332 .await
2333 .user_connection_ids(member_id)
2334 {
2335 session.peer.send(connection_id, update.clone())?;
2336 }
2337
2338 response.send(proto::Ack {})?;
2339 Ok(())
2340}
2341
2342async fn set_channel_member_admin(
2343 request: proto::SetChannelMemberAdmin,
2344 response: Response<proto::SetChannelMemberAdmin>,
2345 session: Session,
2346) -> Result<()> {
2347 let db = session.db().await;
2348 let channel_id = ChannelId::from_proto(request.channel_id);
2349 let member_id = UserId::from_proto(request.user_id);
2350 let role = if request.admin {
2351 ChannelRole::Admin
2352 } else {
2353 ChannelRole::Member
2354 };
2355 db.set_channel_member_role(channel_id, session.user_id, member_id, role)
2356 .await?;
2357
2358 let (channel, has_accepted) = db
2359 .get_channel(channel_id, member_id)
2360 .await?
2361 .ok_or_else(|| anyhow!("channel not found"))?;
2362
2363 let mut update = proto::UpdateChannels::default();
2364 if has_accepted {
2365 update.channel_permissions.push(proto::ChannelPermission {
2366 channel_id: channel.id.to_proto(),
2367 is_admin: request.admin,
2368 });
2369 }
2370
2371 for connection_id in session
2372 .connection_pool()
2373 .await
2374 .user_connection_ids(member_id)
2375 {
2376 session.peer.send(connection_id, update.clone())?;
2377 }
2378
2379 response.send(proto::Ack {})?;
2380 Ok(())
2381}
2382
2383async fn rename_channel(
2384 request: proto::RenameChannel,
2385 response: Response<proto::RenameChannel>,
2386 session: Session,
2387) -> Result<()> {
2388 let db = session.db().await;
2389 let channel_id = ChannelId::from_proto(request.channel_id);
2390 let new_name = db
2391 .rename_channel(channel_id, session.user_id, &request.name)
2392 .await?;
2393
2394 let channel = proto::Channel {
2395 id: request.channel_id,
2396 name: new_name,
2397 };
2398 response.send(proto::RenameChannelResponse {
2399 channel: Some(channel.clone()),
2400 })?;
2401 let mut update = proto::UpdateChannels::default();
2402 update.channels.push(channel);
2403
2404 let member_ids = db.get_channel_members(channel_id).await?;
2405
2406 let connection_pool = session.connection_pool().await;
2407 for member_id in member_ids {
2408 for connection_id in connection_pool.user_connection_ids(member_id) {
2409 session.peer.send(connection_id, update.clone())?;
2410 }
2411 }
2412
2413 Ok(())
2414}
2415
2416async fn link_channel(
2417 request: proto::LinkChannel,
2418 response: Response<proto::LinkChannel>,
2419 session: Session,
2420) -> Result<()> {
2421 let db = session.db().await;
2422 let channel_id = ChannelId::from_proto(request.channel_id);
2423 let to = ChannelId::from_proto(request.to);
2424 let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2425
2426 let members = db.get_channel_members(to).await?;
2427 let connection_pool = session.connection_pool().await;
2428 let update = proto::UpdateChannels {
2429 channels: channels_to_send
2430 .channels
2431 .into_iter()
2432 .map(|channel| proto::Channel {
2433 id: channel.id.to_proto(),
2434 name: channel.name,
2435 })
2436 .collect(),
2437 insert_edge: channels_to_send.edges,
2438 ..Default::default()
2439 };
2440 for member_id in members {
2441 for connection_id in connection_pool.user_connection_ids(member_id) {
2442 session.peer.send(connection_id, update.clone())?;
2443 }
2444 }
2445
2446 response.send(Ack {})?;
2447
2448 Ok(())
2449}
2450
2451async fn unlink_channel(
2452 request: proto::UnlinkChannel,
2453 response: Response<proto::UnlinkChannel>,
2454 session: Session,
2455) -> Result<()> {
2456 let db = session.db().await;
2457 let channel_id = ChannelId::from_proto(request.channel_id);
2458 let from = ChannelId::from_proto(request.from);
2459
2460 db.unlink_channel(session.user_id, channel_id, from).await?;
2461
2462 let members = db.get_channel_members(from).await?;
2463
2464 let update = proto::UpdateChannels {
2465 delete_edge: vec![proto::ChannelEdge {
2466 channel_id: channel_id.to_proto(),
2467 parent_id: from.to_proto(),
2468 }],
2469 ..Default::default()
2470 };
2471 let connection_pool = session.connection_pool().await;
2472 for member_id in members {
2473 for connection_id in connection_pool.user_connection_ids(member_id) {
2474 session.peer.send(connection_id, update.clone())?;
2475 }
2476 }
2477
2478 response.send(Ack {})?;
2479
2480 Ok(())
2481}
2482
2483async fn move_channel(
2484 request: proto::MoveChannel,
2485 response: Response<proto::MoveChannel>,
2486 session: Session,
2487) -> Result<()> {
2488 let db = session.db().await;
2489 let channel_id = ChannelId::from_proto(request.channel_id);
2490 let from_parent = ChannelId::from_proto(request.from);
2491 let to = ChannelId::from_proto(request.to);
2492
2493 let channels_to_send = db
2494 .move_channel(session.user_id, channel_id, from_parent, to)
2495 .await?;
2496
2497 if channels_to_send.is_empty() {
2498 response.send(Ack {})?;
2499 return Ok(());
2500 }
2501
2502 let members_from = db.get_channel_members(from_parent).await?;
2503 let members_to = db.get_channel_members(to).await?;
2504
2505 let update = proto::UpdateChannels {
2506 delete_edge: vec![proto::ChannelEdge {
2507 channel_id: channel_id.to_proto(),
2508 parent_id: from_parent.to_proto(),
2509 }],
2510 ..Default::default()
2511 };
2512 let connection_pool = session.connection_pool().await;
2513 for member_id in members_from {
2514 for connection_id in connection_pool.user_connection_ids(member_id) {
2515 session.peer.send(connection_id, update.clone())?;
2516 }
2517 }
2518
2519 let update = proto::UpdateChannels {
2520 channels: channels_to_send
2521 .channels
2522 .into_iter()
2523 .map(|channel| proto::Channel {
2524 id: channel.id.to_proto(),
2525 name: channel.name,
2526 })
2527 .collect(),
2528 insert_edge: channels_to_send.edges,
2529 ..Default::default()
2530 };
2531 for member_id in members_to {
2532 for connection_id in connection_pool.user_connection_ids(member_id) {
2533 session.peer.send(connection_id, update.clone())?;
2534 }
2535 }
2536
2537 response.send(Ack {})?;
2538
2539 Ok(())
2540}
2541
2542async fn get_channel_members(
2543 request: proto::GetChannelMembers,
2544 response: Response<proto::GetChannelMembers>,
2545 session: Session,
2546) -> Result<()> {
2547 let db = session.db().await;
2548 let channel_id = ChannelId::from_proto(request.channel_id);
2549 let members = db
2550 .get_channel_member_details(channel_id, session.user_id)
2551 .await?;
2552 response.send(proto::GetChannelMembersResponse { members })?;
2553 Ok(())
2554}
2555
2556async fn respond_to_channel_invite(
2557 request: proto::RespondToChannelInvite,
2558 response: Response<proto::RespondToChannelInvite>,
2559 session: Session,
2560) -> Result<()> {
2561 let db = session.db().await;
2562 let channel_id = ChannelId::from_proto(request.channel_id);
2563 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2564 .await?;
2565
2566 let mut update = proto::UpdateChannels::default();
2567 update
2568 .remove_channel_invitations
2569 .push(channel_id.to_proto());
2570 if request.accept {
2571 let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2572 update
2573 .channels
2574 .extend(
2575 result
2576 .channels
2577 .channels
2578 .into_iter()
2579 .map(|channel| proto::Channel {
2580 id: channel.id.to_proto(),
2581 name: channel.name,
2582 }),
2583 );
2584 update.unseen_channel_messages = result.channel_messages;
2585 update.unseen_channel_buffer_changes = result.unseen_buffer_changes;
2586 update.insert_edge = result.channels.edges;
2587 update
2588 .channel_participants
2589 .extend(
2590 result
2591 .channel_participants
2592 .into_iter()
2593 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2594 channel_id: channel_id.to_proto(),
2595 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2596 }),
2597 );
2598 update
2599 .channel_permissions
2600 .extend(
2601 result
2602 .channels_with_admin_privileges
2603 .into_iter()
2604 .map(|channel_id| proto::ChannelPermission {
2605 channel_id: channel_id.to_proto(),
2606 is_admin: true,
2607 }),
2608 );
2609 }
2610 session.peer.send(session.connection_id, update)?;
2611 response.send(proto::Ack {})?;
2612
2613 Ok(())
2614}
2615
2616async fn join_channel(
2617 request: proto::JoinChannel,
2618 response: Response<proto::JoinChannel>,
2619 session: Session,
2620) -> Result<()> {
2621 let channel_id = ChannelId::from_proto(request.channel_id);
2622 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2623
2624 let joined_room = {
2625 leave_room_for_session(&session).await?;
2626 let db = session.db().await;
2627
2628 let room_id = db
2629 .get_or_create_channel_room(channel_id, &live_kit_room, &*RELEASE_CHANNEL_NAME)
2630 .await?;
2631
2632 let joined_room = db
2633 .join_room(
2634 room_id,
2635 session.user_id,
2636 session.connection_id,
2637 RELEASE_CHANNEL_NAME.as_str(),
2638 )
2639 .await?;
2640
2641 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2642 let token = live_kit
2643 .room_token(
2644 &joined_room.room.live_kit_room,
2645 &session.user_id.to_string(),
2646 )
2647 .trace_err()?;
2648
2649 Some(LiveKitConnectionInfo {
2650 server_url: live_kit.url().into(),
2651 token,
2652 })
2653 });
2654
2655 response.send(proto::JoinRoomResponse {
2656 room: Some(joined_room.room.clone()),
2657 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2658 live_kit_connection_info,
2659 })?;
2660
2661 room_updated(&joined_room.room, &session.peer);
2662
2663 joined_room.into_inner()
2664 };
2665
2666 channel_updated(
2667 channel_id,
2668 &joined_room.room,
2669 &joined_room.channel_members,
2670 &session.peer,
2671 &*session.connection_pool().await,
2672 );
2673
2674 update_user_contacts(session.user_id, &session).await?;
2675
2676 Ok(())
2677}
2678
2679async fn join_channel_buffer(
2680 request: proto::JoinChannelBuffer,
2681 response: Response<proto::JoinChannelBuffer>,
2682 session: Session,
2683) -> Result<()> {
2684 let db = session.db().await;
2685 let channel_id = ChannelId::from_proto(request.channel_id);
2686
2687 let open_response = db
2688 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2689 .await?;
2690
2691 let collaborators = open_response.collaborators.clone();
2692 response.send(open_response)?;
2693
2694 let update = UpdateChannelBufferCollaborators {
2695 channel_id: channel_id.to_proto(),
2696 collaborators: collaborators.clone(),
2697 };
2698 channel_buffer_updated(
2699 session.connection_id,
2700 collaborators
2701 .iter()
2702 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2703 &update,
2704 &session.peer,
2705 );
2706
2707 Ok(())
2708}
2709
2710async fn update_channel_buffer(
2711 request: proto::UpdateChannelBuffer,
2712 session: Session,
2713) -> Result<()> {
2714 let db = session.db().await;
2715 let channel_id = ChannelId::from_proto(request.channel_id);
2716
2717 let (collaborators, non_collaborators, epoch, version) = db
2718 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2719 .await?;
2720
2721 channel_buffer_updated(
2722 session.connection_id,
2723 collaborators,
2724 &proto::UpdateChannelBuffer {
2725 channel_id: channel_id.to_proto(),
2726 operations: request.operations,
2727 },
2728 &session.peer,
2729 );
2730
2731 let pool = &*session.connection_pool().await;
2732
2733 broadcast(
2734 None,
2735 non_collaborators
2736 .iter()
2737 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2738 |peer_id| {
2739 session.peer.send(
2740 peer_id.into(),
2741 proto::UpdateChannels {
2742 unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange {
2743 channel_id: channel_id.to_proto(),
2744 epoch: epoch as u64,
2745 version: version.clone(),
2746 }],
2747 ..Default::default()
2748 },
2749 )
2750 },
2751 );
2752
2753 Ok(())
2754}
2755
2756async fn rejoin_channel_buffers(
2757 request: proto::RejoinChannelBuffers,
2758 response: Response<proto::RejoinChannelBuffers>,
2759 session: Session,
2760) -> Result<()> {
2761 let db = session.db().await;
2762 let buffers = db
2763 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2764 .await?;
2765
2766 for rejoined_buffer in &buffers {
2767 let collaborators_to_notify = rejoined_buffer
2768 .buffer
2769 .collaborators
2770 .iter()
2771 .filter_map(|c| Some(c.peer_id?.into()));
2772 channel_buffer_updated(
2773 session.connection_id,
2774 collaborators_to_notify,
2775 &proto::UpdateChannelBufferCollaborators {
2776 channel_id: rejoined_buffer.buffer.channel_id,
2777 collaborators: rejoined_buffer.buffer.collaborators.clone(),
2778 },
2779 &session.peer,
2780 );
2781 }
2782
2783 response.send(proto::RejoinChannelBuffersResponse {
2784 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2785 })?;
2786
2787 Ok(())
2788}
2789
2790async fn leave_channel_buffer(
2791 request: proto::LeaveChannelBuffer,
2792 response: Response<proto::LeaveChannelBuffer>,
2793 session: Session,
2794) -> Result<()> {
2795 let db = session.db().await;
2796 let channel_id = ChannelId::from_proto(request.channel_id);
2797
2798 let left_buffer = db
2799 .leave_channel_buffer(channel_id, session.connection_id)
2800 .await?;
2801
2802 response.send(Ack {})?;
2803
2804 channel_buffer_updated(
2805 session.connection_id,
2806 left_buffer.connections,
2807 &proto::UpdateChannelBufferCollaborators {
2808 channel_id: channel_id.to_proto(),
2809 collaborators: left_buffer.collaborators,
2810 },
2811 &session.peer,
2812 );
2813
2814 Ok(())
2815}
2816
2817fn channel_buffer_updated<T: EnvelopedMessage>(
2818 sender_id: ConnectionId,
2819 collaborators: impl IntoIterator<Item = ConnectionId>,
2820 message: &T,
2821 peer: &Peer,
2822) {
2823 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2824 peer.send(peer_id.into(), message.clone())
2825 });
2826}
2827
2828async fn send_channel_message(
2829 request: proto::SendChannelMessage,
2830 response: Response<proto::SendChannelMessage>,
2831 session: Session,
2832) -> Result<()> {
2833 // Validate the message body.
2834 let body = request.body.trim().to_string();
2835 if body.len() > MAX_MESSAGE_LEN {
2836 return Err(anyhow!("message is too long"))?;
2837 }
2838 if body.is_empty() {
2839 return Err(anyhow!("message can't be blank"))?;
2840 }
2841
2842 let timestamp = OffsetDateTime::now_utc();
2843 let nonce = request
2844 .nonce
2845 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2846
2847 let channel_id = ChannelId::from_proto(request.channel_id);
2848 let (message_id, connection_ids, non_participants) = session
2849 .db()
2850 .await
2851 .create_channel_message(
2852 channel_id,
2853 session.user_id,
2854 &body,
2855 timestamp,
2856 nonce.clone().into(),
2857 )
2858 .await?;
2859 let message = proto::ChannelMessage {
2860 sender_id: session.user_id.to_proto(),
2861 id: message_id.to_proto(),
2862 body,
2863 timestamp: timestamp.unix_timestamp() as u64,
2864 nonce: Some(nonce),
2865 };
2866 broadcast(Some(session.connection_id), connection_ids, |connection| {
2867 session.peer.send(
2868 connection,
2869 proto::ChannelMessageSent {
2870 channel_id: channel_id.to_proto(),
2871 message: Some(message.clone()),
2872 },
2873 )
2874 });
2875 response.send(proto::SendChannelMessageResponse {
2876 message: Some(message),
2877 })?;
2878
2879 let pool = &*session.connection_pool().await;
2880 broadcast(
2881 None,
2882 non_participants
2883 .iter()
2884 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2885 |peer_id| {
2886 session.peer.send(
2887 peer_id.into(),
2888 proto::UpdateChannels {
2889 unseen_channel_messages: vec![proto::UnseenChannelMessage {
2890 channel_id: channel_id.to_proto(),
2891 message_id: message_id.to_proto(),
2892 }],
2893 ..Default::default()
2894 },
2895 )
2896 },
2897 );
2898
2899 Ok(())
2900}
2901
2902async fn remove_channel_message(
2903 request: proto::RemoveChannelMessage,
2904 response: Response<proto::RemoveChannelMessage>,
2905 session: Session,
2906) -> Result<()> {
2907 let channel_id = ChannelId::from_proto(request.channel_id);
2908 let message_id = MessageId::from_proto(request.message_id);
2909 let connection_ids = session
2910 .db()
2911 .await
2912 .remove_channel_message(channel_id, message_id, session.user_id)
2913 .await?;
2914 broadcast(Some(session.connection_id), connection_ids, |connection| {
2915 session.peer.send(connection, request.clone())
2916 });
2917 response.send(proto::Ack {})?;
2918 Ok(())
2919}
2920
2921async fn acknowledge_channel_message(
2922 request: proto::AckChannelMessage,
2923 session: Session,
2924) -> Result<()> {
2925 let channel_id = ChannelId::from_proto(request.channel_id);
2926 let message_id = MessageId::from_proto(request.message_id);
2927 session
2928 .db()
2929 .await
2930 .observe_channel_message(channel_id, session.user_id, message_id)
2931 .await?;
2932 Ok(())
2933}
2934
2935async fn acknowledge_buffer_version(
2936 request: proto::AckBufferOperation,
2937 session: Session,
2938) -> Result<()> {
2939 let buffer_id = BufferId::from_proto(request.buffer_id);
2940 session
2941 .db()
2942 .await
2943 .observe_buffer_version(
2944 buffer_id,
2945 session.user_id,
2946 request.epoch as i32,
2947 &request.version,
2948 )
2949 .await?;
2950 Ok(())
2951}
2952
2953async fn join_channel_chat(
2954 request: proto::JoinChannelChat,
2955 response: Response<proto::JoinChannelChat>,
2956 session: Session,
2957) -> Result<()> {
2958 let channel_id = ChannelId::from_proto(request.channel_id);
2959
2960 let db = session.db().await;
2961 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2962 .await?;
2963 let messages = db
2964 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2965 .await?;
2966 response.send(proto::JoinChannelChatResponse {
2967 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2968 messages,
2969 })?;
2970 Ok(())
2971}
2972
2973async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2974 let channel_id = ChannelId::from_proto(request.channel_id);
2975 session
2976 .db()
2977 .await
2978 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2979 .await?;
2980 Ok(())
2981}
2982
2983async fn get_channel_messages(
2984 request: proto::GetChannelMessages,
2985 response: Response<proto::GetChannelMessages>,
2986 session: Session,
2987) -> Result<()> {
2988 let channel_id = ChannelId::from_proto(request.channel_id);
2989 let messages = session
2990 .db()
2991 .await
2992 .get_channel_messages(
2993 channel_id,
2994 session.user_id,
2995 MESSAGE_COUNT_PER_PAGE,
2996 Some(MessageId::from_proto(request.before_message_id)),
2997 )
2998 .await?;
2999 response.send(proto::GetChannelMessagesResponse {
3000 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
3001 messages,
3002 })?;
3003 Ok(())
3004}
3005
3006async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
3007 let project_id = ProjectId::from_proto(request.project_id);
3008 let project_connection_ids = session
3009 .db()
3010 .await
3011 .project_connection_ids(project_id, session.connection_id)
3012 .await?;
3013 broadcast(
3014 Some(session.connection_id),
3015 project_connection_ids.iter().copied(),
3016 |connection_id| {
3017 session
3018 .peer
3019 .forward_send(session.connection_id, connection_id, request.clone())
3020 },
3021 );
3022 Ok(())
3023}
3024
3025async fn get_private_user_info(
3026 _request: proto::GetPrivateUserInfo,
3027 response: Response<proto::GetPrivateUserInfo>,
3028 session: Session,
3029) -> Result<()> {
3030 let db = session.db().await;
3031
3032 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
3033 let user = db
3034 .get_user_by_id(session.user_id)
3035 .await?
3036 .ok_or_else(|| anyhow!("user not found"))?;
3037 let flags = db.get_user_flags(session.user_id).await?;
3038
3039 response.send(proto::GetPrivateUserInfoResponse {
3040 metrics_id,
3041 staff: user.admin,
3042 flags,
3043 })?;
3044 Ok(())
3045}
3046
3047fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
3048 match message {
3049 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
3050 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
3051 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
3052 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
3053 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
3054 code: frame.code.into(),
3055 reason: frame.reason,
3056 })),
3057 }
3058}
3059
3060fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
3061 match message {
3062 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
3063 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
3064 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
3065 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
3066 AxumMessage::Close(frame) => {
3067 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
3068 code: frame.code.into(),
3069 reason: frame.reason,
3070 }))
3071 }
3072 }
3073}
3074
3075fn build_initial_channels_update(
3076 channels: ChannelsForUser,
3077 channel_invites: Vec<db::Channel>,
3078) -> proto::UpdateChannels {
3079 let mut update = proto::UpdateChannels::default();
3080
3081 for channel in channels.channels.channels {
3082 update.channels.push(proto::Channel {
3083 id: channel.id.to_proto(),
3084 name: channel.name,
3085 });
3086 }
3087
3088 update.unseen_channel_buffer_changes = channels.unseen_buffer_changes;
3089 update.unseen_channel_messages = channels.channel_messages;
3090 update.insert_edge = channels.channels.edges;
3091
3092 for (channel_id, participants) in channels.channel_participants {
3093 update
3094 .channel_participants
3095 .push(proto::ChannelParticipants {
3096 channel_id: channel_id.to_proto(),
3097 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
3098 });
3099 }
3100
3101 update
3102 .channel_permissions
3103 .extend(
3104 channels
3105 .channels_with_admin_privileges
3106 .into_iter()
3107 .map(|id| proto::ChannelPermission {
3108 channel_id: id.to_proto(),
3109 is_admin: true,
3110 }),
3111 );
3112
3113 for channel in channel_invites {
3114 update.channel_invitations.push(proto::Channel {
3115 id: channel.id.to_proto(),
3116 name: channel.name,
3117 });
3118 }
3119
3120 update
3121}
3122
3123fn build_initial_contacts_update(
3124 contacts: Vec<db::Contact>,
3125 pool: &ConnectionPool,
3126) -> proto::UpdateContacts {
3127 let mut update = proto::UpdateContacts::default();
3128
3129 for contact in contacts {
3130 match contact {
3131 db::Contact::Accepted {
3132 user_id,
3133 should_notify,
3134 busy,
3135 } => {
3136 update
3137 .contacts
3138 .push(contact_for_user(user_id, should_notify, busy, &pool));
3139 }
3140 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3141 db::Contact::Incoming {
3142 user_id,
3143 should_notify,
3144 } => update
3145 .incoming_requests
3146 .push(proto::IncomingContactRequest {
3147 requester_id: user_id.to_proto(),
3148 should_notify,
3149 }),
3150 }
3151 }
3152
3153 update
3154}
3155
3156fn contact_for_user(
3157 user_id: UserId,
3158 should_notify: bool,
3159 busy: bool,
3160 pool: &ConnectionPool,
3161) -> proto::Contact {
3162 proto::Contact {
3163 user_id: user_id.to_proto(),
3164 online: pool.is_user_online(user_id),
3165 busy,
3166 should_notify,
3167 }
3168}
3169
3170fn room_updated(room: &proto::Room, peer: &Peer) {
3171 broadcast(
3172 None,
3173 room.participants
3174 .iter()
3175 .filter_map(|participant| Some(participant.peer_id?.into())),
3176 |peer_id| {
3177 peer.send(
3178 peer_id.into(),
3179 proto::RoomUpdated {
3180 room: Some(room.clone()),
3181 },
3182 )
3183 },
3184 );
3185}
3186
3187fn channel_updated(
3188 channel_id: ChannelId,
3189 room: &proto::Room,
3190 channel_members: &[UserId],
3191 peer: &Peer,
3192 pool: &ConnectionPool,
3193) {
3194 let participants = room
3195 .participants
3196 .iter()
3197 .map(|p| p.user_id)
3198 .collect::<Vec<_>>();
3199
3200 broadcast(
3201 None,
3202 channel_members
3203 .iter()
3204 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3205 |peer_id| {
3206 peer.send(
3207 peer_id.into(),
3208 proto::UpdateChannels {
3209 channel_participants: vec![proto::ChannelParticipants {
3210 channel_id: channel_id.to_proto(),
3211 participant_user_ids: participants.clone(),
3212 }],
3213 ..Default::default()
3214 },
3215 )
3216 },
3217 );
3218}
3219
3220async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3221 let db = session.db().await;
3222
3223 let contacts = db.get_contacts(user_id).await?;
3224 let busy = db.is_user_busy(user_id).await?;
3225
3226 let pool = session.connection_pool().await;
3227 let updated_contact = contact_for_user(user_id, false, busy, &pool);
3228 for contact in contacts {
3229 if let db::Contact::Accepted {
3230 user_id: contact_user_id,
3231 ..
3232 } = contact
3233 {
3234 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3235 session
3236 .peer
3237 .send(
3238 contact_conn_id,
3239 proto::UpdateContacts {
3240 contacts: vec![updated_contact.clone()],
3241 remove_contacts: Default::default(),
3242 incoming_requests: Default::default(),
3243 remove_incoming_requests: Default::default(),
3244 outgoing_requests: Default::default(),
3245 remove_outgoing_requests: Default::default(),
3246 },
3247 )
3248 .trace_err();
3249 }
3250 }
3251 }
3252 Ok(())
3253}
3254
3255async fn leave_room_for_session(session: &Session) -> Result<()> {
3256 let mut contacts_to_update = HashSet::default();
3257
3258 let room_id;
3259 let canceled_calls_to_user_ids;
3260 let live_kit_room;
3261 let delete_live_kit_room;
3262 let room;
3263 let channel_members;
3264 let channel_id;
3265
3266 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3267 contacts_to_update.insert(session.user_id);
3268
3269 for project in left_room.left_projects.values() {
3270 project_left(project, session);
3271 }
3272
3273 room_id = RoomId::from_proto(left_room.room.id);
3274 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3275 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3276 delete_live_kit_room = left_room.deleted;
3277 room = mem::take(&mut left_room.room);
3278 channel_members = mem::take(&mut left_room.channel_members);
3279 channel_id = left_room.channel_id;
3280
3281 room_updated(&room, &session.peer);
3282 } else {
3283 return Ok(());
3284 }
3285
3286 if let Some(channel_id) = channel_id {
3287 channel_updated(
3288 channel_id,
3289 &room,
3290 &channel_members,
3291 &session.peer,
3292 &*session.connection_pool().await,
3293 );
3294 }
3295
3296 {
3297 let pool = session.connection_pool().await;
3298 for canceled_user_id in canceled_calls_to_user_ids {
3299 for connection_id in pool.user_connection_ids(canceled_user_id) {
3300 session
3301 .peer
3302 .send(
3303 connection_id,
3304 proto::CallCanceled {
3305 room_id: room_id.to_proto(),
3306 },
3307 )
3308 .trace_err();
3309 }
3310 contacts_to_update.insert(canceled_user_id);
3311 }
3312 }
3313
3314 for contact_user_id in contacts_to_update {
3315 update_user_contacts(contact_user_id, &session).await?;
3316 }
3317
3318 if let Some(live_kit) = session.live_kit_client.as_ref() {
3319 live_kit
3320 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3321 .await
3322 .trace_err();
3323
3324 if delete_live_kit_room {
3325 live_kit.delete_room(live_kit_room).await.trace_err();
3326 }
3327 }
3328
3329 Ok(())
3330}
3331
3332async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3333 let left_channel_buffers = session
3334 .db()
3335 .await
3336 .leave_channel_buffers(session.connection_id)
3337 .await?;
3338
3339 for left_buffer in left_channel_buffers {
3340 channel_buffer_updated(
3341 session.connection_id,
3342 left_buffer.connections,
3343 &proto::UpdateChannelBufferCollaborators {
3344 channel_id: left_buffer.channel_id.to_proto(),
3345 collaborators: left_buffer.collaborators,
3346 },
3347 &session.peer,
3348 );
3349 }
3350
3351 Ok(())
3352}
3353
3354fn project_left(project: &db::LeftProject, session: &Session) {
3355 for connection_id in &project.connection_ids {
3356 if project.host_user_id == session.user_id {
3357 session
3358 .peer
3359 .send(
3360 *connection_id,
3361 proto::UnshareProject {
3362 project_id: project.id.to_proto(),
3363 },
3364 )
3365 .trace_err();
3366 } else {
3367 session
3368 .peer
3369 .send(
3370 *connection_id,
3371 proto::RemoveProjectCollaborator {
3372 project_id: project.id.to_proto(),
3373 peer_id: Some(session.connection_id.into()),
3374 },
3375 )
3376 .trace_err();
3377 }
3378 }
3379}
3380
3381pub trait ResultExt {
3382 type Ok;
3383
3384 fn trace_err(self) -> Option<Self::Ok>;
3385}
3386
3387impl<T, E> ResultExt for Result<T, E>
3388where
3389 E: std::fmt::Debug,
3390{
3391 type Ok = T;
3392
3393 fn trace_err(self) -> Option<T> {
3394 match self {
3395 Ok(value) => Some(value),
3396 Err(error) => {
3397 tracing::error!("{:?}", error);
3398 None
3399 }
3400 }
3401 }
3402}