1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, BufferId, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId,
7 ServerId, User, UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AnyTypedEnvelope, ChannelEdge, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage, UpdateChannelBufferCollaborators,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66use util::channel::RELEASE_CHANNEL_NAME;
67
68pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
69pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
74
75lazy_static! {
76 static ref METRIC_CONNECTIONS: IntGauge =
77 register_int_gauge!("connections", "number of connections").unwrap();
78 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
79 "shared_projects",
80 "number of open projects with one or more guests"
81 )
82 .unwrap();
83}
84
85type MessageHandler =
86 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
87
88struct Response<R> {
89 peer: Arc<Peer>,
90 receipt: Receipt<R>,
91 responded: Arc<AtomicBool>,
92}
93
94impl<R: RequestMessage> Response<R> {
95 fn send(self, payload: R::Response) -> Result<()> {
96 self.responded.store(true, SeqCst);
97 self.peer.respond(self.receipt, payload)?;
98 Ok(())
99 }
100}
101
102#[derive(Clone)]
103struct Session {
104 user_id: UserId,
105 connection_id: ConnectionId,
106 db: Arc<tokio::sync::Mutex<DbHandle>>,
107 peer: Arc<Peer>,
108 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
109 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
110 executor: Executor,
111}
112
113impl Session {
114 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
115 #[cfg(test)]
116 tokio::task::yield_now().await;
117 let guard = self.db.lock().await;
118 #[cfg(test)]
119 tokio::task::yield_now().await;
120 guard
121 }
122
123 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
124 #[cfg(test)]
125 tokio::task::yield_now().await;
126 let guard = self.connection_pool.lock();
127 ConnectionPoolGuard {
128 guard,
129 _not_send: PhantomData,
130 }
131 }
132}
133
134impl fmt::Debug for Session {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_struct("Session")
137 .field("user_id", &self.user_id)
138 .field("connection_id", &self.connection_id)
139 .finish()
140 }
141}
142
143struct DbHandle(Arc<Database>);
144
145impl Deref for DbHandle {
146 type Target = Database;
147
148 fn deref(&self) -> &Self::Target {
149 self.0.as_ref()
150 }
151}
152
153pub struct Server {
154 id: parking_lot::Mutex<ServerId>,
155 peer: Arc<Peer>,
156 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
157 app_state: Arc<AppState>,
158 executor: Executor,
159 handlers: HashMap<TypeId, MessageHandler>,
160 teardown: watch::Sender<()>,
161}
162
163pub(crate) struct ConnectionPoolGuard<'a> {
164 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
165 _not_send: PhantomData<Rc<()>>,
166}
167
168#[derive(Serialize)]
169pub struct ServerSnapshot<'a> {
170 peer: &'a Peer,
171 #[serde(serialize_with = "serialize_deref")]
172 connection_pool: ConnectionPoolGuard<'a>,
173}
174
175pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
176where
177 S: Serializer,
178 T: Deref<Target = U>,
179 U: Serialize,
180{
181 Serialize::serialize(value.deref(), serializer)
182}
183
184impl Server {
185 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
186 let mut server = Self {
187 id: parking_lot::Mutex::new(id),
188 peer: Peer::new(id.0 as u32),
189 app_state,
190 executor,
191 connection_pool: Default::default(),
192 handlers: Default::default(),
193 teardown: watch::channel(()).0,
194 };
195
196 server
197 .add_request_handler(ping)
198 .add_request_handler(create_room)
199 .add_request_handler(join_room)
200 .add_request_handler(rejoin_room)
201 .add_request_handler(leave_room)
202 .add_request_handler(call)
203 .add_request_handler(cancel_call)
204 .add_message_handler(decline_call)
205 .add_request_handler(update_participant_location)
206 .add_request_handler(share_project)
207 .add_message_handler(unshare_project)
208 .add_request_handler(join_project)
209 .add_message_handler(leave_project)
210 .add_request_handler(update_project)
211 .add_request_handler(update_worktree)
212 .add_message_handler(start_language_server)
213 .add_message_handler(update_language_server)
214 .add_message_handler(update_diagnostic_summary)
215 .add_message_handler(update_worktree_settings)
216 .add_message_handler(refresh_inlay_hints)
217 .add_request_handler(forward_project_request::<proto::GetHover>)
218 .add_request_handler(forward_project_request::<proto::GetDefinition>)
219 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
220 .add_request_handler(forward_project_request::<proto::GetReferences>)
221 .add_request_handler(forward_project_request::<proto::SearchProject>)
222 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
223 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
225 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
226 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
227 .add_request_handler(forward_project_request::<proto::GetCompletions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
229 .add_request_handler(forward_project_request::<proto::ResolveCompletionDocumentation>)
230 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
231 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
232 .add_request_handler(forward_project_request::<proto::PrepareRename>)
233 .add_request_handler(forward_project_request::<proto::PerformRename>)
234 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
235 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
236 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
237 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
240 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
241 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
242 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
243 .add_request_handler(forward_project_request::<proto::InlayHints>)
244 .add_message_handler(create_buffer_for_peer)
245 .add_request_handler(update_buffer)
246 .add_message_handler(update_buffer_file)
247 .add_message_handler(buffer_reloaded)
248 .add_message_handler(buffer_saved)
249 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
250 .add_request_handler(get_users)
251 .add_request_handler(fuzzy_search_users)
252 .add_request_handler(request_contact)
253 .add_request_handler(remove_contact)
254 .add_request_handler(respond_to_contact_request)
255 .add_request_handler(create_channel)
256 .add_request_handler(delete_channel)
257 .add_request_handler(invite_channel_member)
258 .add_request_handler(remove_channel_member)
259 .add_request_handler(set_channel_member_admin)
260 .add_request_handler(rename_channel)
261 .add_request_handler(join_channel_buffer)
262 .add_request_handler(leave_channel_buffer)
263 .add_message_handler(update_channel_buffer)
264 .add_request_handler(rejoin_channel_buffers)
265 .add_request_handler(get_channel_members)
266 .add_request_handler(respond_to_channel_invite)
267 .add_request_handler(join_channel)
268 .add_request_handler(join_channel_chat)
269 .add_message_handler(leave_channel_chat)
270 .add_request_handler(send_channel_message)
271 .add_request_handler(remove_channel_message)
272 .add_request_handler(get_channel_messages)
273 .add_request_handler(get_notifications)
274 .add_request_handler(link_channel)
275 .add_request_handler(unlink_channel)
276 .add_request_handler(move_channel)
277 .add_request_handler(follow)
278 .add_message_handler(unfollow)
279 .add_message_handler(update_followers)
280 .add_message_handler(update_diff_base)
281 .add_request_handler(get_private_user_info)
282 .add_message_handler(acknowledge_channel_message)
283 .add_message_handler(acknowledge_buffer_version);
284
285 Arc::new(server)
286 }
287
288 pub async fn start(&self) -> Result<()> {
289 let server_id = *self.id.lock();
290 let app_state = self.app_state.clone();
291 let peer = self.peer.clone();
292 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
293 let pool = self.connection_pool.clone();
294 let live_kit_client = self.app_state.live_kit_client.clone();
295
296 let span = info_span!("start server");
297 self.executor.spawn_detached(
298 async move {
299 tracing::info!("waiting for cleanup timeout");
300 timeout.await;
301 tracing::info!("cleanup timeout expired, retrieving stale rooms");
302 if let Some((room_ids, channel_ids)) = app_state
303 .db
304 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
305 .await
306 .trace_err()
307 {
308 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
309 tracing::info!(
310 stale_channel_buffer_count = channel_ids.len(),
311 "retrieved stale channel buffers"
312 );
313
314 for channel_id in channel_ids {
315 if let Some(refreshed_channel_buffer) = app_state
316 .db
317 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
318 .await
319 .trace_err()
320 {
321 for connection_id in refreshed_channel_buffer.connection_ids {
322 peer.send(
323 connection_id,
324 proto::UpdateChannelBufferCollaborators {
325 channel_id: channel_id.to_proto(),
326 collaborators: refreshed_channel_buffer
327 .collaborators
328 .clone(),
329 },
330 )
331 .trace_err();
332 }
333 }
334 }
335
336 for room_id in room_ids {
337 let mut contacts_to_update = HashSet::default();
338 let mut canceled_calls_to_user_ids = Vec::new();
339 let mut live_kit_room = String::new();
340 let mut delete_live_kit_room = false;
341
342 if let Some(mut refreshed_room) = app_state
343 .db
344 .clear_stale_room_participants(room_id, server_id)
345 .await
346 .trace_err()
347 {
348 tracing::info!(
349 room_id = room_id.0,
350 new_participant_count = refreshed_room.room.participants.len(),
351 "refreshed room"
352 );
353 room_updated(&refreshed_room.room, &peer);
354 if let Some(channel_id) = refreshed_room.channel_id {
355 channel_updated(
356 channel_id,
357 &refreshed_room.room,
358 &refreshed_room.channel_members,
359 &peer,
360 &*pool.lock(),
361 );
362 }
363 contacts_to_update
364 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
365 contacts_to_update
366 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
367 canceled_calls_to_user_ids =
368 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
369 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
370 delete_live_kit_room = refreshed_room.room.participants.is_empty();
371 }
372
373 {
374 let pool = pool.lock();
375 for canceled_user_id in canceled_calls_to_user_ids {
376 for connection_id in pool.user_connection_ids(canceled_user_id) {
377 peer.send(
378 connection_id,
379 proto::CallCanceled {
380 room_id: room_id.to_proto(),
381 },
382 )
383 .trace_err();
384 }
385 }
386 }
387
388 for user_id in contacts_to_update {
389 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
390 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
391 if let Some((busy, contacts)) = busy.zip(contacts) {
392 let pool = pool.lock();
393 let updated_contact = contact_for_user(user_id, busy, &pool);
394 for contact in contacts {
395 if let db::Contact::Accepted {
396 user_id: contact_user_id,
397 ..
398 } = contact
399 {
400 for contact_conn_id in
401 pool.user_connection_ids(contact_user_id)
402 {
403 peer.send(
404 contact_conn_id,
405 proto::UpdateContacts {
406 contacts: vec![updated_contact.clone()],
407 remove_contacts: Default::default(),
408 incoming_requests: Default::default(),
409 remove_incoming_requests: Default::default(),
410 outgoing_requests: Default::default(),
411 remove_outgoing_requests: Default::default(),
412 },
413 )
414 .trace_err();
415 }
416 }
417 }
418 }
419 }
420
421 if let Some(live_kit) = live_kit_client.as_ref() {
422 if delete_live_kit_room {
423 live_kit.delete_room(live_kit_room).await.trace_err();
424 }
425 }
426 }
427 }
428
429 app_state
430 .db
431 .delete_stale_servers(&app_state.config.zed_environment, server_id)
432 .await
433 .trace_err();
434 }
435 .instrument(span),
436 );
437 Ok(())
438 }
439
440 pub fn teardown(&self) {
441 self.peer.teardown();
442 self.connection_pool.lock().reset();
443 let _ = self.teardown.send(());
444 }
445
446 #[cfg(test)]
447 pub fn reset(&self, id: ServerId) {
448 self.teardown();
449 *self.id.lock() = id;
450 self.peer.reset(id.0 as u32);
451 }
452
453 #[cfg(test)]
454 pub fn id(&self) -> ServerId {
455 *self.id.lock()
456 }
457
458 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
459 where
460 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
461 Fut: 'static + Send + Future<Output = Result<()>>,
462 M: EnvelopedMessage,
463 {
464 let prev_handler = self.handlers.insert(
465 TypeId::of::<M>(),
466 Box::new(move |envelope, session| {
467 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
468 let span = info_span!(
469 "handle message",
470 payload_type = envelope.payload_type_name()
471 );
472 span.in_scope(|| {
473 tracing::info!(
474 payload_type = envelope.payload_type_name(),
475 "message received"
476 );
477 });
478 let start_time = Instant::now();
479 let future = (handler)(*envelope, session);
480 async move {
481 let result = future.await;
482 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
483 match result {
484 Err(error) => {
485 tracing::error!(%error, ?duration_ms, "error handling message")
486 }
487 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
488 }
489 }
490 .instrument(span)
491 .boxed()
492 }),
493 );
494 if prev_handler.is_some() {
495 panic!("registered a handler for the same message twice");
496 }
497 self
498 }
499
500 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
501 where
502 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
503 Fut: 'static + Send + Future<Output = Result<()>>,
504 M: EnvelopedMessage,
505 {
506 self.add_handler(move |envelope, session| handler(envelope.payload, session));
507 self
508 }
509
510 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
511 where
512 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
513 Fut: Send + Future<Output = Result<()>>,
514 M: RequestMessage,
515 {
516 let handler = Arc::new(handler);
517 self.add_handler(move |envelope, session| {
518 let receipt = envelope.receipt();
519 let handler = handler.clone();
520 async move {
521 let peer = session.peer.clone();
522 let responded = Arc::new(AtomicBool::default());
523 let response = Response {
524 peer: peer.clone(),
525 responded: responded.clone(),
526 receipt,
527 };
528 match (handler)(envelope.payload, response, session).await {
529 Ok(()) => {
530 if responded.load(std::sync::atomic::Ordering::SeqCst) {
531 Ok(())
532 } else {
533 Err(anyhow!("handler did not send a response"))?
534 }
535 }
536 Err(error) => {
537 peer.respond_with_error(
538 receipt,
539 proto::Error {
540 message: error.to_string(),
541 },
542 )?;
543 Err(error)
544 }
545 }
546 }
547 })
548 }
549
550 pub fn handle_connection(
551 self: &Arc<Self>,
552 connection: Connection,
553 address: String,
554 user: User,
555 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
556 executor: Executor,
557 ) -> impl Future<Output = Result<()>> {
558 let this = self.clone();
559 let user_id = user.id;
560 let login = user.github_login;
561 let span = info_span!("handle connection", %user_id, %login, %address);
562 let mut teardown = self.teardown.subscribe();
563 async move {
564 let (connection_id, handle_io, mut incoming_rx) = this
565 .peer
566 .add_connection(connection, {
567 let executor = executor.clone();
568 move |duration| executor.sleep(duration)
569 });
570
571 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
572 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
573 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
574
575 if let Some(send_connection_id) = send_connection_id.take() {
576 let _ = send_connection_id.send(connection_id);
577 }
578
579 if !user.connected_once {
580 this.peer.send(connection_id, proto::ShowContacts {})?;
581 this.app_state.db.set_user_connected_once(user_id, true).await?;
582 }
583
584 let (contacts, channels_for_user, channel_invites) = future::try_join3(
585 this.app_state.db.get_contacts(user_id),
586 this.app_state.db.get_channels_for_user(user_id),
587 this.app_state.db.get_channel_invites_for_user(user_id),
588 ).await?;
589
590 {
591 let mut pool = this.connection_pool.lock();
592 pool.add_connection(connection_id, user_id, user.admin);
593 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
594 this.peer.send(connection_id, build_initial_channels_update(
595 channels_for_user,
596 channel_invites
597 ))?;
598 }
599
600 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
601 this.peer.send(connection_id, incoming_call)?;
602 }
603
604 let session = Session {
605 user_id,
606 connection_id,
607 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
608 peer: this.peer.clone(),
609 connection_pool: this.connection_pool.clone(),
610 live_kit_client: this.app_state.live_kit_client.clone(),
611 executor: executor.clone(),
612 };
613 update_user_contacts(user_id, &session).await?;
614
615 let handle_io = handle_io.fuse();
616 futures::pin_mut!(handle_io);
617
618 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
619 // This prevents deadlocks when e.g., client A performs a request to client B and
620 // client B performs a request to client A. If both clients stop processing further
621 // messages until their respective request completes, they won't have a chance to
622 // respond to the other client's request and cause a deadlock.
623 //
624 // This arrangement ensures we will attempt to process earlier messages first, but fall
625 // back to processing messages arrived later in the spirit of making progress.
626 let mut foreground_message_handlers = FuturesUnordered::new();
627 let concurrent_handlers = Arc::new(Semaphore::new(256));
628 loop {
629 let next_message = async {
630 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
631 let message = incoming_rx.next().await;
632 (permit, message)
633 }.fuse();
634 futures::pin_mut!(next_message);
635 futures::select_biased! {
636 _ = teardown.changed().fuse() => return Ok(()),
637 result = handle_io => {
638 if let Err(error) = result {
639 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
640 }
641 break;
642 }
643 _ = foreground_message_handlers.next() => {}
644 next_message = next_message => {
645 let (permit, message) = next_message;
646 if let Some(message) = message {
647 let type_name = message.payload_type_name();
648 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
649 let span_enter = span.enter();
650 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
651 let is_background = message.is_background();
652 let handle_message = (handler)(message, session.clone());
653 drop(span_enter);
654
655 let handle_message = async move {
656 handle_message.await;
657 drop(permit);
658 }.instrument(span);
659 if is_background {
660 executor.spawn_detached(handle_message);
661 } else {
662 foreground_message_handlers.push(handle_message);
663 }
664 } else {
665 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
666 }
667 } else {
668 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
669 break;
670 }
671 }
672 }
673 }
674
675 drop(foreground_message_handlers);
676 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
677 if let Err(error) = connection_lost(session, teardown, executor).await {
678 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
679 }
680
681 Ok(())
682 }.instrument(span)
683 }
684
685 pub async fn invite_code_redeemed(
686 self: &Arc<Self>,
687 inviter_id: UserId,
688 invitee_id: UserId,
689 ) -> Result<()> {
690 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
691 if let Some(code) = &user.invite_code {
692 let pool = self.connection_pool.lock();
693 let invitee_contact = contact_for_user(invitee_id, false, &pool);
694 for connection_id in pool.user_connection_ids(inviter_id) {
695 self.peer.send(
696 connection_id,
697 proto::UpdateContacts {
698 contacts: vec![invitee_contact.clone()],
699 ..Default::default()
700 },
701 )?;
702 self.peer.send(
703 connection_id,
704 proto::UpdateInviteInfo {
705 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
706 count: user.invite_count as u32,
707 },
708 )?;
709 }
710 }
711 }
712 Ok(())
713 }
714
715 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
716 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
717 if let Some(invite_code) = &user.invite_code {
718 let pool = self.connection_pool.lock();
719 for connection_id in pool.user_connection_ids(user_id) {
720 self.peer.send(
721 connection_id,
722 proto::UpdateInviteInfo {
723 url: format!(
724 "{}{}",
725 self.app_state.config.invite_link_prefix, invite_code
726 ),
727 count: user.invite_count as u32,
728 },
729 )?;
730 }
731 }
732 }
733 Ok(())
734 }
735
736 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
737 ServerSnapshot {
738 connection_pool: ConnectionPoolGuard {
739 guard: self.connection_pool.lock(),
740 _not_send: PhantomData,
741 },
742 peer: &self.peer,
743 }
744 }
745}
746
747impl<'a> Deref for ConnectionPoolGuard<'a> {
748 type Target = ConnectionPool;
749
750 fn deref(&self) -> &Self::Target {
751 &*self.guard
752 }
753}
754
755impl<'a> DerefMut for ConnectionPoolGuard<'a> {
756 fn deref_mut(&mut self) -> &mut Self::Target {
757 &mut *self.guard
758 }
759}
760
761impl<'a> Drop for ConnectionPoolGuard<'a> {
762 fn drop(&mut self) {
763 #[cfg(test)]
764 self.check_invariants();
765 }
766}
767
768fn broadcast<F>(
769 sender_id: Option<ConnectionId>,
770 receiver_ids: impl IntoIterator<Item = ConnectionId>,
771 mut f: F,
772) where
773 F: FnMut(ConnectionId) -> anyhow::Result<()>,
774{
775 for receiver_id in receiver_ids {
776 if Some(receiver_id) != sender_id {
777 if let Err(error) = f(receiver_id) {
778 tracing::error!("failed to send to {:?} {}", receiver_id, error);
779 }
780 }
781 }
782}
783
784lazy_static! {
785 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
786}
787
788pub struct ProtocolVersion(u32);
789
790impl Header for ProtocolVersion {
791 fn name() -> &'static HeaderName {
792 &ZED_PROTOCOL_VERSION
793 }
794
795 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
796 where
797 Self: Sized,
798 I: Iterator<Item = &'i axum::http::HeaderValue>,
799 {
800 let version = values
801 .next()
802 .ok_or_else(axum::headers::Error::invalid)?
803 .to_str()
804 .map_err(|_| axum::headers::Error::invalid())?
805 .parse()
806 .map_err(|_| axum::headers::Error::invalid())?;
807 Ok(Self(version))
808 }
809
810 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
811 values.extend([self.0.to_string().parse().unwrap()]);
812 }
813}
814
815pub fn routes(server: Arc<Server>) -> Router<Body> {
816 Router::new()
817 .route("/rpc", get(handle_websocket_request))
818 .layer(
819 ServiceBuilder::new()
820 .layer(Extension(server.app_state.clone()))
821 .layer(middleware::from_fn(auth::validate_header)),
822 )
823 .route("/metrics", get(handle_metrics))
824 .layer(Extension(server))
825}
826
827pub async fn handle_websocket_request(
828 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
829 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
830 Extension(server): Extension<Arc<Server>>,
831 Extension(user): Extension<User>,
832 ws: WebSocketUpgrade,
833) -> axum::response::Response {
834 if protocol_version != rpc::PROTOCOL_VERSION {
835 return (
836 StatusCode::UPGRADE_REQUIRED,
837 "client must be upgraded".to_string(),
838 )
839 .into_response();
840 }
841 let socket_address = socket_address.to_string();
842 ws.on_upgrade(move |socket| {
843 use util::ResultExt;
844 let socket = socket
845 .map_ok(to_tungstenite_message)
846 .err_into()
847 .with(|message| async move { Ok(to_axum_message(message)) });
848 let connection = Connection::new(Box::pin(socket));
849 async move {
850 server
851 .handle_connection(connection, socket_address, user, None, Executor::Production)
852 .await
853 .log_err();
854 }
855 })
856}
857
858pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
859 let connections = server
860 .connection_pool
861 .lock()
862 .connections()
863 .filter(|connection| !connection.admin)
864 .count();
865
866 METRIC_CONNECTIONS.set(connections as _);
867
868 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
869 METRIC_SHARED_PROJECTS.set(shared_projects as _);
870
871 let encoder = prometheus::TextEncoder::new();
872 let metric_families = prometheus::gather();
873 let encoded_metrics = encoder
874 .encode_to_string(&metric_families)
875 .map_err(|err| anyhow!("{}", err))?;
876 Ok(encoded_metrics)
877}
878
879#[instrument(err, skip(executor))]
880async fn connection_lost(
881 session: Session,
882 mut teardown: watch::Receiver<()>,
883 executor: Executor,
884) -> Result<()> {
885 session.peer.disconnect(session.connection_id);
886 session
887 .connection_pool()
888 .await
889 .remove_connection(session.connection_id)?;
890
891 session
892 .db()
893 .await
894 .connection_lost(session.connection_id)
895 .await
896 .trace_err();
897
898 futures::select_biased! {
899 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
900 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
901 leave_room_for_session(&session).await.trace_err();
902 leave_channel_buffers_for_session(&session)
903 .await
904 .trace_err();
905
906 if !session
907 .connection_pool()
908 .await
909 .is_user_online(session.user_id)
910 {
911 let db = session.db().await;
912 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
913 room_updated(&room, &session.peer);
914 }
915 }
916
917 update_user_contacts(session.user_id, &session).await?;
918 }
919 _ = teardown.changed().fuse() => {}
920 }
921
922 Ok(())
923}
924
925async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
926 response.send(proto::Ack {})?;
927 Ok(())
928}
929
930async fn create_room(
931 _request: proto::CreateRoom,
932 response: Response<proto::CreateRoom>,
933 session: Session,
934) -> Result<()> {
935 let live_kit_room = nanoid::nanoid!(30);
936
937 let live_kit_connection_info = {
938 let live_kit_room = live_kit_room.clone();
939 let live_kit = session.live_kit_client.as_ref();
940
941 util::async_iife!({
942 let live_kit = live_kit?;
943
944 let token = live_kit
945 .room_token(&live_kit_room, &session.user_id.to_string())
946 .trace_err()?;
947
948 Some(proto::LiveKitConnectionInfo {
949 server_url: live_kit.url().into(),
950 token,
951 })
952 })
953 }
954 .await;
955
956 let room = session
957 .db()
958 .await
959 .create_room(
960 session.user_id,
961 session.connection_id,
962 &live_kit_room,
963 RELEASE_CHANNEL_NAME.as_str(),
964 )
965 .await?;
966
967 response.send(proto::CreateRoomResponse {
968 room: Some(room.clone()),
969 live_kit_connection_info,
970 })?;
971
972 update_user_contacts(session.user_id, &session).await?;
973 Ok(())
974}
975
976async fn join_room(
977 request: proto::JoinRoom,
978 response: Response<proto::JoinRoom>,
979 session: Session,
980) -> Result<()> {
981 let room_id = RoomId::from_proto(request.id);
982 let joined_room = {
983 let room = session
984 .db()
985 .await
986 .join_room(
987 room_id,
988 session.user_id,
989 session.connection_id,
990 RELEASE_CHANNEL_NAME.as_str(),
991 )
992 .await?;
993 room_updated(&room.room, &session.peer);
994 room.into_inner()
995 };
996
997 if let Some(channel_id) = joined_room.channel_id {
998 channel_updated(
999 channel_id,
1000 &joined_room.room,
1001 &joined_room.channel_members,
1002 &session.peer,
1003 &*session.connection_pool().await,
1004 )
1005 }
1006
1007 for connection_id in session
1008 .connection_pool()
1009 .await
1010 .user_connection_ids(session.user_id)
1011 {
1012 session
1013 .peer
1014 .send(
1015 connection_id,
1016 proto::CallCanceled {
1017 room_id: room_id.to_proto(),
1018 },
1019 )
1020 .trace_err();
1021 }
1022
1023 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1024 if let Some(token) = live_kit
1025 .room_token(
1026 &joined_room.room.live_kit_room,
1027 &session.user_id.to_string(),
1028 )
1029 .trace_err()
1030 {
1031 Some(proto::LiveKitConnectionInfo {
1032 server_url: live_kit.url().into(),
1033 token,
1034 })
1035 } else {
1036 None
1037 }
1038 } else {
1039 None
1040 };
1041
1042 response.send(proto::JoinRoomResponse {
1043 room: Some(joined_room.room),
1044 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1045 live_kit_connection_info,
1046 })?;
1047
1048 update_user_contacts(session.user_id, &session).await?;
1049 Ok(())
1050}
1051
1052async fn rejoin_room(
1053 request: proto::RejoinRoom,
1054 response: Response<proto::RejoinRoom>,
1055 session: Session,
1056) -> Result<()> {
1057 let room;
1058 let channel_id;
1059 let channel_members;
1060 {
1061 let mut rejoined_room = session
1062 .db()
1063 .await
1064 .rejoin_room(request, session.user_id, session.connection_id)
1065 .await?;
1066
1067 response.send(proto::RejoinRoomResponse {
1068 room: Some(rejoined_room.room.clone()),
1069 reshared_projects: rejoined_room
1070 .reshared_projects
1071 .iter()
1072 .map(|project| proto::ResharedProject {
1073 id: project.id.to_proto(),
1074 collaborators: project
1075 .collaborators
1076 .iter()
1077 .map(|collaborator| collaborator.to_proto())
1078 .collect(),
1079 })
1080 .collect(),
1081 rejoined_projects: rejoined_room
1082 .rejoined_projects
1083 .iter()
1084 .map(|rejoined_project| proto::RejoinedProject {
1085 id: rejoined_project.id.to_proto(),
1086 worktrees: rejoined_project
1087 .worktrees
1088 .iter()
1089 .map(|worktree| proto::WorktreeMetadata {
1090 id: worktree.id,
1091 root_name: worktree.root_name.clone(),
1092 visible: worktree.visible,
1093 abs_path: worktree.abs_path.clone(),
1094 })
1095 .collect(),
1096 collaborators: rejoined_project
1097 .collaborators
1098 .iter()
1099 .map(|collaborator| collaborator.to_proto())
1100 .collect(),
1101 language_servers: rejoined_project.language_servers.clone(),
1102 })
1103 .collect(),
1104 })?;
1105 room_updated(&rejoined_room.room, &session.peer);
1106
1107 for project in &rejoined_room.reshared_projects {
1108 for collaborator in &project.collaborators {
1109 session
1110 .peer
1111 .send(
1112 collaborator.connection_id,
1113 proto::UpdateProjectCollaborator {
1114 project_id: project.id.to_proto(),
1115 old_peer_id: Some(project.old_connection_id.into()),
1116 new_peer_id: Some(session.connection_id.into()),
1117 },
1118 )
1119 .trace_err();
1120 }
1121
1122 broadcast(
1123 Some(session.connection_id),
1124 project
1125 .collaborators
1126 .iter()
1127 .map(|collaborator| collaborator.connection_id),
1128 |connection_id| {
1129 session.peer.forward_send(
1130 session.connection_id,
1131 connection_id,
1132 proto::UpdateProject {
1133 project_id: project.id.to_proto(),
1134 worktrees: project.worktrees.clone(),
1135 },
1136 )
1137 },
1138 );
1139 }
1140
1141 for project in &rejoined_room.rejoined_projects {
1142 for collaborator in &project.collaborators {
1143 session
1144 .peer
1145 .send(
1146 collaborator.connection_id,
1147 proto::UpdateProjectCollaborator {
1148 project_id: project.id.to_proto(),
1149 old_peer_id: Some(project.old_connection_id.into()),
1150 new_peer_id: Some(session.connection_id.into()),
1151 },
1152 )
1153 .trace_err();
1154 }
1155 }
1156
1157 for project in &mut rejoined_room.rejoined_projects {
1158 for worktree in mem::take(&mut project.worktrees) {
1159 #[cfg(any(test, feature = "test-support"))]
1160 const MAX_CHUNK_SIZE: usize = 2;
1161 #[cfg(not(any(test, feature = "test-support")))]
1162 const MAX_CHUNK_SIZE: usize = 256;
1163
1164 // Stream this worktree's entries.
1165 let message = proto::UpdateWorktree {
1166 project_id: project.id.to_proto(),
1167 worktree_id: worktree.id,
1168 abs_path: worktree.abs_path.clone(),
1169 root_name: worktree.root_name,
1170 updated_entries: worktree.updated_entries,
1171 removed_entries: worktree.removed_entries,
1172 scan_id: worktree.scan_id,
1173 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1174 updated_repositories: worktree.updated_repositories,
1175 removed_repositories: worktree.removed_repositories,
1176 };
1177 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1178 session.peer.send(session.connection_id, update.clone())?;
1179 }
1180
1181 // Stream this worktree's diagnostics.
1182 for summary in worktree.diagnostic_summaries {
1183 session.peer.send(
1184 session.connection_id,
1185 proto::UpdateDiagnosticSummary {
1186 project_id: project.id.to_proto(),
1187 worktree_id: worktree.id,
1188 summary: Some(summary),
1189 },
1190 )?;
1191 }
1192
1193 for settings_file in worktree.settings_files {
1194 session.peer.send(
1195 session.connection_id,
1196 proto::UpdateWorktreeSettings {
1197 project_id: project.id.to_proto(),
1198 worktree_id: worktree.id,
1199 path: settings_file.path,
1200 content: Some(settings_file.content),
1201 },
1202 )?;
1203 }
1204 }
1205
1206 for language_server in &project.language_servers {
1207 session.peer.send(
1208 session.connection_id,
1209 proto::UpdateLanguageServer {
1210 project_id: project.id.to_proto(),
1211 language_server_id: language_server.id,
1212 variant: Some(
1213 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1214 proto::LspDiskBasedDiagnosticsUpdated {},
1215 ),
1216 ),
1217 },
1218 )?;
1219 }
1220 }
1221
1222 let rejoined_room = rejoined_room.into_inner();
1223
1224 room = rejoined_room.room;
1225 channel_id = rejoined_room.channel_id;
1226 channel_members = rejoined_room.channel_members;
1227 }
1228
1229 if let Some(channel_id) = channel_id {
1230 channel_updated(
1231 channel_id,
1232 &room,
1233 &channel_members,
1234 &session.peer,
1235 &*session.connection_pool().await,
1236 );
1237 }
1238
1239 update_user_contacts(session.user_id, &session).await?;
1240 Ok(())
1241}
1242
1243async fn leave_room(
1244 _: proto::LeaveRoom,
1245 response: Response<proto::LeaveRoom>,
1246 session: Session,
1247) -> Result<()> {
1248 leave_room_for_session(&session).await?;
1249 response.send(proto::Ack {})?;
1250 Ok(())
1251}
1252
1253async fn call(
1254 request: proto::Call,
1255 response: Response<proto::Call>,
1256 session: Session,
1257) -> Result<()> {
1258 let room_id = RoomId::from_proto(request.room_id);
1259 let calling_user_id = session.user_id;
1260 let calling_connection_id = session.connection_id;
1261 let called_user_id = UserId::from_proto(request.called_user_id);
1262 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1263 if !session
1264 .db()
1265 .await
1266 .has_contact(calling_user_id, called_user_id)
1267 .await?
1268 {
1269 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1270 }
1271
1272 let incoming_call = {
1273 let (room, incoming_call) = &mut *session
1274 .db()
1275 .await
1276 .call(
1277 room_id,
1278 calling_user_id,
1279 calling_connection_id,
1280 called_user_id,
1281 initial_project_id,
1282 )
1283 .await?;
1284 room_updated(&room, &session.peer);
1285 mem::take(incoming_call)
1286 };
1287 update_user_contacts(called_user_id, &session).await?;
1288
1289 let mut calls = session
1290 .connection_pool()
1291 .await
1292 .user_connection_ids(called_user_id)
1293 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1294 .collect::<FuturesUnordered<_>>();
1295
1296 while let Some(call_response) = calls.next().await {
1297 match call_response.as_ref() {
1298 Ok(_) => {
1299 response.send(proto::Ack {})?;
1300 return Ok(());
1301 }
1302 Err(_) => {
1303 call_response.trace_err();
1304 }
1305 }
1306 }
1307
1308 {
1309 let room = session
1310 .db()
1311 .await
1312 .call_failed(room_id, called_user_id)
1313 .await?;
1314 room_updated(&room, &session.peer);
1315 }
1316 update_user_contacts(called_user_id, &session).await?;
1317
1318 Err(anyhow!("failed to ring user"))?
1319}
1320
1321async fn cancel_call(
1322 request: proto::CancelCall,
1323 response: Response<proto::CancelCall>,
1324 session: Session,
1325) -> Result<()> {
1326 let called_user_id = UserId::from_proto(request.called_user_id);
1327 let room_id = RoomId::from_proto(request.room_id);
1328 {
1329 let room = session
1330 .db()
1331 .await
1332 .cancel_call(room_id, session.connection_id, called_user_id)
1333 .await?;
1334 room_updated(&room, &session.peer);
1335 }
1336
1337 for connection_id in session
1338 .connection_pool()
1339 .await
1340 .user_connection_ids(called_user_id)
1341 {
1342 session
1343 .peer
1344 .send(
1345 connection_id,
1346 proto::CallCanceled {
1347 room_id: room_id.to_proto(),
1348 },
1349 )
1350 .trace_err();
1351 }
1352 response.send(proto::Ack {})?;
1353
1354 update_user_contacts(called_user_id, &session).await?;
1355 Ok(())
1356}
1357
1358async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1359 let room_id = RoomId::from_proto(message.room_id);
1360 {
1361 let room = session
1362 .db()
1363 .await
1364 .decline_call(Some(room_id), session.user_id)
1365 .await?
1366 .ok_or_else(|| anyhow!("failed to decline call"))?;
1367 room_updated(&room, &session.peer);
1368 }
1369
1370 for connection_id in session
1371 .connection_pool()
1372 .await
1373 .user_connection_ids(session.user_id)
1374 {
1375 session
1376 .peer
1377 .send(
1378 connection_id,
1379 proto::CallCanceled {
1380 room_id: room_id.to_proto(),
1381 },
1382 )
1383 .trace_err();
1384 }
1385 update_user_contacts(session.user_id, &session).await?;
1386 Ok(())
1387}
1388
1389async fn update_participant_location(
1390 request: proto::UpdateParticipantLocation,
1391 response: Response<proto::UpdateParticipantLocation>,
1392 session: Session,
1393) -> Result<()> {
1394 let room_id = RoomId::from_proto(request.room_id);
1395 let location = request
1396 .location
1397 .ok_or_else(|| anyhow!("invalid location"))?;
1398
1399 let db = session.db().await;
1400 let room = db
1401 .update_room_participant_location(room_id, session.connection_id, location)
1402 .await?;
1403
1404 room_updated(&room, &session.peer);
1405 response.send(proto::Ack {})?;
1406 Ok(())
1407}
1408
1409async fn share_project(
1410 request: proto::ShareProject,
1411 response: Response<proto::ShareProject>,
1412 session: Session,
1413) -> Result<()> {
1414 let (project_id, room) = &*session
1415 .db()
1416 .await
1417 .share_project(
1418 RoomId::from_proto(request.room_id),
1419 session.connection_id,
1420 &request.worktrees,
1421 )
1422 .await?;
1423 response.send(proto::ShareProjectResponse {
1424 project_id: project_id.to_proto(),
1425 })?;
1426 room_updated(&room, &session.peer);
1427
1428 Ok(())
1429}
1430
1431async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1432 let project_id = ProjectId::from_proto(message.project_id);
1433
1434 let (room, guest_connection_ids) = &*session
1435 .db()
1436 .await
1437 .unshare_project(project_id, session.connection_id)
1438 .await?;
1439
1440 broadcast(
1441 Some(session.connection_id),
1442 guest_connection_ids.iter().copied(),
1443 |conn_id| session.peer.send(conn_id, message.clone()),
1444 );
1445 room_updated(&room, &session.peer);
1446
1447 Ok(())
1448}
1449
1450async fn join_project(
1451 request: proto::JoinProject,
1452 response: Response<proto::JoinProject>,
1453 session: Session,
1454) -> Result<()> {
1455 let project_id = ProjectId::from_proto(request.project_id);
1456 let guest_user_id = session.user_id;
1457
1458 tracing::info!(%project_id, "join project");
1459
1460 let (project, replica_id) = &mut *session
1461 .db()
1462 .await
1463 .join_project(project_id, session.connection_id)
1464 .await?;
1465
1466 let collaborators = project
1467 .collaborators
1468 .iter()
1469 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1470 .map(|collaborator| collaborator.to_proto())
1471 .collect::<Vec<_>>();
1472
1473 let worktrees = project
1474 .worktrees
1475 .iter()
1476 .map(|(id, worktree)| proto::WorktreeMetadata {
1477 id: *id,
1478 root_name: worktree.root_name.clone(),
1479 visible: worktree.visible,
1480 abs_path: worktree.abs_path.clone(),
1481 })
1482 .collect::<Vec<_>>();
1483
1484 for collaborator in &collaborators {
1485 session
1486 .peer
1487 .send(
1488 collaborator.peer_id.unwrap().into(),
1489 proto::AddProjectCollaborator {
1490 project_id: project_id.to_proto(),
1491 collaborator: Some(proto::Collaborator {
1492 peer_id: Some(session.connection_id.into()),
1493 replica_id: replica_id.0 as u32,
1494 user_id: guest_user_id.to_proto(),
1495 }),
1496 },
1497 )
1498 .trace_err();
1499 }
1500
1501 // First, we send the metadata associated with each worktree.
1502 response.send(proto::JoinProjectResponse {
1503 worktrees: worktrees.clone(),
1504 replica_id: replica_id.0 as u32,
1505 collaborators: collaborators.clone(),
1506 language_servers: project.language_servers.clone(),
1507 })?;
1508
1509 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1510 #[cfg(any(test, feature = "test-support"))]
1511 const MAX_CHUNK_SIZE: usize = 2;
1512 #[cfg(not(any(test, feature = "test-support")))]
1513 const MAX_CHUNK_SIZE: usize = 256;
1514
1515 // Stream this worktree's entries.
1516 let message = proto::UpdateWorktree {
1517 project_id: project_id.to_proto(),
1518 worktree_id,
1519 abs_path: worktree.abs_path.clone(),
1520 root_name: worktree.root_name,
1521 updated_entries: worktree.entries,
1522 removed_entries: Default::default(),
1523 scan_id: worktree.scan_id,
1524 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1525 updated_repositories: worktree.repository_entries.into_values().collect(),
1526 removed_repositories: Default::default(),
1527 };
1528 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1529 session.peer.send(session.connection_id, update.clone())?;
1530 }
1531
1532 // Stream this worktree's diagnostics.
1533 for summary in worktree.diagnostic_summaries {
1534 session.peer.send(
1535 session.connection_id,
1536 proto::UpdateDiagnosticSummary {
1537 project_id: project_id.to_proto(),
1538 worktree_id: worktree.id,
1539 summary: Some(summary),
1540 },
1541 )?;
1542 }
1543
1544 for settings_file in worktree.settings_files {
1545 session.peer.send(
1546 session.connection_id,
1547 proto::UpdateWorktreeSettings {
1548 project_id: project_id.to_proto(),
1549 worktree_id: worktree.id,
1550 path: settings_file.path,
1551 content: Some(settings_file.content),
1552 },
1553 )?;
1554 }
1555 }
1556
1557 for language_server in &project.language_servers {
1558 session.peer.send(
1559 session.connection_id,
1560 proto::UpdateLanguageServer {
1561 project_id: project_id.to_proto(),
1562 language_server_id: language_server.id,
1563 variant: Some(
1564 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1565 proto::LspDiskBasedDiagnosticsUpdated {},
1566 ),
1567 ),
1568 },
1569 )?;
1570 }
1571
1572 Ok(())
1573}
1574
1575async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1576 let sender_id = session.connection_id;
1577 let project_id = ProjectId::from_proto(request.project_id);
1578
1579 let (room, project) = &*session
1580 .db()
1581 .await
1582 .leave_project(project_id, sender_id)
1583 .await?;
1584 tracing::info!(
1585 %project_id,
1586 host_user_id = %project.host_user_id,
1587 host_connection_id = %project.host_connection_id,
1588 "leave project"
1589 );
1590
1591 project_left(&project, &session);
1592 room_updated(&room, &session.peer);
1593
1594 Ok(())
1595}
1596
1597async fn update_project(
1598 request: proto::UpdateProject,
1599 response: Response<proto::UpdateProject>,
1600 session: Session,
1601) -> Result<()> {
1602 let project_id = ProjectId::from_proto(request.project_id);
1603 let (room, guest_connection_ids) = &*session
1604 .db()
1605 .await
1606 .update_project(project_id, session.connection_id, &request.worktrees)
1607 .await?;
1608 broadcast(
1609 Some(session.connection_id),
1610 guest_connection_ids.iter().copied(),
1611 |connection_id| {
1612 session
1613 .peer
1614 .forward_send(session.connection_id, connection_id, request.clone())
1615 },
1616 );
1617 room_updated(&room, &session.peer);
1618 response.send(proto::Ack {})?;
1619
1620 Ok(())
1621}
1622
1623async fn update_worktree(
1624 request: proto::UpdateWorktree,
1625 response: Response<proto::UpdateWorktree>,
1626 session: Session,
1627) -> Result<()> {
1628 let guest_connection_ids = session
1629 .db()
1630 .await
1631 .update_worktree(&request, session.connection_id)
1632 .await?;
1633
1634 broadcast(
1635 Some(session.connection_id),
1636 guest_connection_ids.iter().copied(),
1637 |connection_id| {
1638 session
1639 .peer
1640 .forward_send(session.connection_id, connection_id, request.clone())
1641 },
1642 );
1643 response.send(proto::Ack {})?;
1644 Ok(())
1645}
1646
1647async fn update_diagnostic_summary(
1648 message: proto::UpdateDiagnosticSummary,
1649 session: Session,
1650) -> Result<()> {
1651 let guest_connection_ids = session
1652 .db()
1653 .await
1654 .update_diagnostic_summary(&message, session.connection_id)
1655 .await?;
1656
1657 broadcast(
1658 Some(session.connection_id),
1659 guest_connection_ids.iter().copied(),
1660 |connection_id| {
1661 session
1662 .peer
1663 .forward_send(session.connection_id, connection_id, message.clone())
1664 },
1665 );
1666
1667 Ok(())
1668}
1669
1670async fn update_worktree_settings(
1671 message: proto::UpdateWorktreeSettings,
1672 session: Session,
1673) -> Result<()> {
1674 let guest_connection_ids = session
1675 .db()
1676 .await
1677 .update_worktree_settings(&message, session.connection_id)
1678 .await?;
1679
1680 broadcast(
1681 Some(session.connection_id),
1682 guest_connection_ids.iter().copied(),
1683 |connection_id| {
1684 session
1685 .peer
1686 .forward_send(session.connection_id, connection_id, message.clone())
1687 },
1688 );
1689
1690 Ok(())
1691}
1692
1693async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1694 broadcast_project_message(request.project_id, request, session).await
1695}
1696
1697async fn start_language_server(
1698 request: proto::StartLanguageServer,
1699 session: Session,
1700) -> Result<()> {
1701 let guest_connection_ids = session
1702 .db()
1703 .await
1704 .start_language_server(&request, session.connection_id)
1705 .await?;
1706
1707 broadcast(
1708 Some(session.connection_id),
1709 guest_connection_ids.iter().copied(),
1710 |connection_id| {
1711 session
1712 .peer
1713 .forward_send(session.connection_id, connection_id, request.clone())
1714 },
1715 );
1716 Ok(())
1717}
1718
1719async fn update_language_server(
1720 request: proto::UpdateLanguageServer,
1721 session: Session,
1722) -> Result<()> {
1723 session.executor.record_backtrace();
1724 let project_id = ProjectId::from_proto(request.project_id);
1725 let project_connection_ids = session
1726 .db()
1727 .await
1728 .project_connection_ids(project_id, session.connection_id)
1729 .await?;
1730 broadcast(
1731 Some(session.connection_id),
1732 project_connection_ids.iter().copied(),
1733 |connection_id| {
1734 session
1735 .peer
1736 .forward_send(session.connection_id, connection_id, request.clone())
1737 },
1738 );
1739 Ok(())
1740}
1741
1742async fn forward_project_request<T>(
1743 request: T,
1744 response: Response<T>,
1745 session: Session,
1746) -> Result<()>
1747where
1748 T: EntityMessage + RequestMessage,
1749{
1750 session.executor.record_backtrace();
1751 let project_id = ProjectId::from_proto(request.remote_entity_id());
1752 let host_connection_id = {
1753 let collaborators = session
1754 .db()
1755 .await
1756 .project_collaborators(project_id, session.connection_id)
1757 .await?;
1758 collaborators
1759 .iter()
1760 .find(|collaborator| collaborator.is_host)
1761 .ok_or_else(|| anyhow!("host not found"))?
1762 .connection_id
1763 };
1764
1765 let payload = session
1766 .peer
1767 .forward_request(session.connection_id, host_connection_id, request)
1768 .await?;
1769
1770 response.send(payload)?;
1771 Ok(())
1772}
1773
1774async fn create_buffer_for_peer(
1775 request: proto::CreateBufferForPeer,
1776 session: Session,
1777) -> Result<()> {
1778 session.executor.record_backtrace();
1779 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1780 session
1781 .peer
1782 .forward_send(session.connection_id, peer_id.into(), request)?;
1783 Ok(())
1784}
1785
1786async fn update_buffer(
1787 request: proto::UpdateBuffer,
1788 response: Response<proto::UpdateBuffer>,
1789 session: Session,
1790) -> Result<()> {
1791 session.executor.record_backtrace();
1792 let project_id = ProjectId::from_proto(request.project_id);
1793 let mut guest_connection_ids;
1794 let mut host_connection_id = None;
1795 {
1796 let collaborators = session
1797 .db()
1798 .await
1799 .project_collaborators(project_id, session.connection_id)
1800 .await?;
1801 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1802 for collaborator in collaborators.iter() {
1803 if collaborator.is_host {
1804 host_connection_id = Some(collaborator.connection_id);
1805 } else {
1806 guest_connection_ids.push(collaborator.connection_id);
1807 }
1808 }
1809 }
1810 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1811
1812 session.executor.record_backtrace();
1813 broadcast(
1814 Some(session.connection_id),
1815 guest_connection_ids,
1816 |connection_id| {
1817 session
1818 .peer
1819 .forward_send(session.connection_id, connection_id, request.clone())
1820 },
1821 );
1822 if host_connection_id != session.connection_id {
1823 session
1824 .peer
1825 .forward_request(session.connection_id, host_connection_id, request.clone())
1826 .await?;
1827 }
1828
1829 response.send(proto::Ack {})?;
1830 Ok(())
1831}
1832
1833async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1834 let project_id = ProjectId::from_proto(request.project_id);
1835 let project_connection_ids = session
1836 .db()
1837 .await
1838 .project_connection_ids(project_id, session.connection_id)
1839 .await?;
1840
1841 broadcast(
1842 Some(session.connection_id),
1843 project_connection_ids.iter().copied(),
1844 |connection_id| {
1845 session
1846 .peer
1847 .forward_send(session.connection_id, connection_id, request.clone())
1848 },
1849 );
1850 Ok(())
1851}
1852
1853async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1854 let project_id = ProjectId::from_proto(request.project_id);
1855 let project_connection_ids = session
1856 .db()
1857 .await
1858 .project_connection_ids(project_id, session.connection_id)
1859 .await?;
1860 broadcast(
1861 Some(session.connection_id),
1862 project_connection_ids.iter().copied(),
1863 |connection_id| {
1864 session
1865 .peer
1866 .forward_send(session.connection_id, connection_id, request.clone())
1867 },
1868 );
1869 Ok(())
1870}
1871
1872async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1873 broadcast_project_message(request.project_id, request, session).await
1874}
1875
1876async fn broadcast_project_message<T: EnvelopedMessage>(
1877 project_id: u64,
1878 request: T,
1879 session: Session,
1880) -> Result<()> {
1881 let project_id = ProjectId::from_proto(project_id);
1882 let project_connection_ids = session
1883 .db()
1884 .await
1885 .project_connection_ids(project_id, session.connection_id)
1886 .await?;
1887 broadcast(
1888 Some(session.connection_id),
1889 project_connection_ids.iter().copied(),
1890 |connection_id| {
1891 session
1892 .peer
1893 .forward_send(session.connection_id, connection_id, request.clone())
1894 },
1895 );
1896 Ok(())
1897}
1898
1899async fn follow(
1900 request: proto::Follow,
1901 response: Response<proto::Follow>,
1902 session: Session,
1903) -> Result<()> {
1904 let room_id = RoomId::from_proto(request.room_id);
1905 let project_id = request.project_id.map(ProjectId::from_proto);
1906 let leader_id = request
1907 .leader_id
1908 .ok_or_else(|| anyhow!("invalid leader id"))?
1909 .into();
1910 let follower_id = session.connection_id;
1911
1912 session
1913 .db()
1914 .await
1915 .check_room_participants(room_id, leader_id, session.connection_id)
1916 .await?;
1917
1918 let response_payload = session
1919 .peer
1920 .forward_request(session.connection_id, leader_id, request)
1921 .await?;
1922 response.send(response_payload)?;
1923
1924 if let Some(project_id) = project_id {
1925 let room = session
1926 .db()
1927 .await
1928 .follow(room_id, project_id, leader_id, follower_id)
1929 .await?;
1930 room_updated(&room, &session.peer);
1931 }
1932
1933 Ok(())
1934}
1935
1936async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1937 let room_id = RoomId::from_proto(request.room_id);
1938 let project_id = request.project_id.map(ProjectId::from_proto);
1939 let leader_id = request
1940 .leader_id
1941 .ok_or_else(|| anyhow!("invalid leader id"))?
1942 .into();
1943 let follower_id = session.connection_id;
1944
1945 session
1946 .db()
1947 .await
1948 .check_room_participants(room_id, leader_id, session.connection_id)
1949 .await?;
1950
1951 session
1952 .peer
1953 .forward_send(session.connection_id, leader_id, request)?;
1954
1955 if let Some(project_id) = project_id {
1956 let room = session
1957 .db()
1958 .await
1959 .unfollow(room_id, project_id, leader_id, follower_id)
1960 .await?;
1961 room_updated(&room, &session.peer);
1962 }
1963
1964 Ok(())
1965}
1966
1967async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1968 let room_id = RoomId::from_proto(request.room_id);
1969 let database = session.db.lock().await;
1970
1971 let connection_ids = if let Some(project_id) = request.project_id {
1972 let project_id = ProjectId::from_proto(project_id);
1973 database
1974 .project_connection_ids(project_id, session.connection_id)
1975 .await?
1976 } else {
1977 database
1978 .room_connection_ids(room_id, session.connection_id)
1979 .await?
1980 };
1981
1982 // For now, don't send view update messages back to that view's current leader.
1983 let connection_id_to_omit = request.variant.as_ref().and_then(|variant| match variant {
1984 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1985 _ => None,
1986 });
1987
1988 for follower_peer_id in request.follower_ids.iter().copied() {
1989 let follower_connection_id = follower_peer_id.into();
1990 if Some(follower_peer_id) != connection_id_to_omit
1991 && connection_ids.contains(&follower_connection_id)
1992 {
1993 session.peer.forward_send(
1994 session.connection_id,
1995 follower_connection_id,
1996 request.clone(),
1997 )?;
1998 }
1999 }
2000 Ok(())
2001}
2002
2003async fn get_users(
2004 request: proto::GetUsers,
2005 response: Response<proto::GetUsers>,
2006 session: Session,
2007) -> Result<()> {
2008 let user_ids = request
2009 .user_ids
2010 .into_iter()
2011 .map(UserId::from_proto)
2012 .collect();
2013 let users = session
2014 .db()
2015 .await
2016 .get_users_by_ids(user_ids)
2017 .await?
2018 .into_iter()
2019 .map(|user| proto::User {
2020 id: user.id.to_proto(),
2021 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2022 github_login: user.github_login,
2023 })
2024 .collect();
2025 response.send(proto::UsersResponse { users })?;
2026 Ok(())
2027}
2028
2029async fn fuzzy_search_users(
2030 request: proto::FuzzySearchUsers,
2031 response: Response<proto::FuzzySearchUsers>,
2032 session: Session,
2033) -> Result<()> {
2034 let query = request.query;
2035 let users = match query.len() {
2036 0 => vec![],
2037 1 | 2 => session
2038 .db()
2039 .await
2040 .get_user_by_github_login(&query)
2041 .await?
2042 .into_iter()
2043 .collect(),
2044 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2045 };
2046 let users = users
2047 .into_iter()
2048 .filter(|user| user.id != session.user_id)
2049 .map(|user| proto::User {
2050 id: user.id.to_proto(),
2051 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2052 github_login: user.github_login,
2053 })
2054 .collect();
2055 response.send(proto::UsersResponse { users })?;
2056 Ok(())
2057}
2058
2059async fn request_contact(
2060 request: proto::RequestContact,
2061 response: Response<proto::RequestContact>,
2062 session: Session,
2063) -> Result<()> {
2064 let requester_id = session.user_id;
2065 let responder_id = UserId::from_proto(request.responder_id);
2066 if requester_id == responder_id {
2067 return Err(anyhow!("cannot add yourself as a contact"))?;
2068 }
2069
2070 let notification = session
2071 .db()
2072 .await
2073 .send_contact_request(requester_id, responder_id)
2074 .await?;
2075
2076 // Update outgoing contact requests of requester
2077 let mut update = proto::UpdateContacts::default();
2078 update.outgoing_requests.push(responder_id.to_proto());
2079 for connection_id in session
2080 .connection_pool()
2081 .await
2082 .user_connection_ids(requester_id)
2083 {
2084 session.peer.send(connection_id, update.clone())?;
2085 }
2086
2087 // Update incoming contact requests of responder
2088 let mut update = proto::UpdateContacts::default();
2089 update
2090 .incoming_requests
2091 .push(proto::IncomingContactRequest {
2092 requester_id: requester_id.to_proto(),
2093 });
2094 for connection_id in session
2095 .connection_pool()
2096 .await
2097 .user_connection_ids(responder_id)
2098 {
2099 session.peer.send(connection_id, update.clone())?;
2100 session.peer.send(
2101 connection_id,
2102 proto::NewNotification {
2103 notification: Some(notification.clone()),
2104 },
2105 )?;
2106 }
2107
2108 response.send(proto::Ack {})?;
2109 Ok(())
2110}
2111
2112async fn respond_to_contact_request(
2113 request: proto::RespondToContactRequest,
2114 response: Response<proto::RespondToContactRequest>,
2115 session: Session,
2116) -> Result<()> {
2117 let responder_id = session.user_id;
2118 let requester_id = UserId::from_proto(request.requester_id);
2119 let db = session.db().await;
2120 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2121 db.dismiss_contact_notification(responder_id, requester_id)
2122 .await?;
2123 } else {
2124 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2125
2126 let notification = db
2127 .respond_to_contact_request(responder_id, requester_id, accept)
2128 .await?;
2129 let requester_busy = db.is_user_busy(requester_id).await?;
2130 let responder_busy = db.is_user_busy(responder_id).await?;
2131
2132 let pool = session.connection_pool().await;
2133 // Update responder with new contact
2134 let mut update = proto::UpdateContacts::default();
2135 if accept {
2136 update
2137 .contacts
2138 .push(contact_for_user(requester_id, requester_busy, &pool));
2139 }
2140 update
2141 .remove_incoming_requests
2142 .push(requester_id.to_proto());
2143 for connection_id in pool.user_connection_ids(responder_id) {
2144 session.peer.send(connection_id, update.clone())?;
2145 }
2146
2147 // Update requester with new contact
2148 let mut update = proto::UpdateContacts::default();
2149 if accept {
2150 update
2151 .contacts
2152 .push(contact_for_user(responder_id, responder_busy, &pool));
2153 }
2154 update
2155 .remove_outgoing_requests
2156 .push(responder_id.to_proto());
2157 for connection_id in pool.user_connection_ids(requester_id) {
2158 session.peer.send(connection_id, update.clone())?;
2159 session.peer.send(
2160 connection_id,
2161 proto::NewNotification {
2162 notification: Some(notification.clone()),
2163 },
2164 )?;
2165 }
2166 }
2167
2168 response.send(proto::Ack {})?;
2169 Ok(())
2170}
2171
2172async fn remove_contact(
2173 request: proto::RemoveContact,
2174 response: Response<proto::RemoveContact>,
2175 session: Session,
2176) -> Result<()> {
2177 let requester_id = session.user_id;
2178 let responder_id = UserId::from_proto(request.user_id);
2179 let db = session.db().await;
2180 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2181
2182 let pool = session.connection_pool().await;
2183 // Update outgoing contact requests of requester
2184 let mut update = proto::UpdateContacts::default();
2185 if contact_accepted {
2186 update.remove_contacts.push(responder_id.to_proto());
2187 } else {
2188 update
2189 .remove_outgoing_requests
2190 .push(responder_id.to_proto());
2191 }
2192 for connection_id in pool.user_connection_ids(requester_id) {
2193 session.peer.send(connection_id, update.clone())?;
2194 }
2195
2196 // Update incoming contact requests of responder
2197 let mut update = proto::UpdateContacts::default();
2198 if contact_accepted {
2199 update.remove_contacts.push(requester_id.to_proto());
2200 } else {
2201 update
2202 .remove_incoming_requests
2203 .push(requester_id.to_proto());
2204 }
2205 for connection_id in pool.user_connection_ids(responder_id) {
2206 session.peer.send(connection_id, update.clone())?;
2207 }
2208
2209 response.send(proto::Ack {})?;
2210 Ok(())
2211}
2212
2213async fn create_channel(
2214 request: proto::CreateChannel,
2215 response: Response<proto::CreateChannel>,
2216 session: Session,
2217) -> Result<()> {
2218 let db = session.db().await;
2219
2220 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2221 let id = db
2222 .create_channel(&request.name, parent_id, session.user_id)
2223 .await?;
2224
2225 let channel = proto::Channel {
2226 id: id.to_proto(),
2227 name: request.name,
2228 };
2229
2230 response.send(proto::CreateChannelResponse {
2231 channel: Some(channel.clone()),
2232 parent_id: request.parent_id,
2233 })?;
2234
2235 let Some(parent_id) = parent_id else {
2236 return Ok(());
2237 };
2238
2239 let update = proto::UpdateChannels {
2240 channels: vec![channel],
2241 insert_edge: vec![ChannelEdge {
2242 parent_id: parent_id.to_proto(),
2243 channel_id: id.to_proto(),
2244 }],
2245 ..Default::default()
2246 };
2247
2248 let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2249
2250 let connection_pool = session.connection_pool().await;
2251 for user_id in user_ids_to_notify {
2252 for connection_id in connection_pool.user_connection_ids(user_id) {
2253 if user_id == session.user_id {
2254 continue;
2255 }
2256 session.peer.send(connection_id, update.clone())?;
2257 }
2258 }
2259
2260 Ok(())
2261}
2262
2263async fn delete_channel(
2264 request: proto::DeleteChannel,
2265 response: Response<proto::DeleteChannel>,
2266 session: Session,
2267) -> Result<()> {
2268 let db = session.db().await;
2269
2270 let channel_id = request.channel_id;
2271 let (removed_channels, member_ids) = db
2272 .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2273 .await?;
2274 response.send(proto::Ack {})?;
2275
2276 // Notify members of removed channels
2277 let mut update = proto::UpdateChannels::default();
2278 update
2279 .delete_channels
2280 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2281
2282 let connection_pool = session.connection_pool().await;
2283 for member_id in member_ids {
2284 for connection_id in connection_pool.user_connection_ids(member_id) {
2285 session.peer.send(connection_id, update.clone())?;
2286 }
2287 }
2288
2289 Ok(())
2290}
2291
2292async fn invite_channel_member(
2293 request: proto::InviteChannelMember,
2294 response: Response<proto::InviteChannelMember>,
2295 session: Session,
2296) -> Result<()> {
2297 let db = session.db().await;
2298 let channel_id = ChannelId::from_proto(request.channel_id);
2299 let invitee_id = UserId::from_proto(request.user_id);
2300 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2301 .await?;
2302
2303 let (channel, _) = db
2304 .get_channel(channel_id, session.user_id)
2305 .await?
2306 .ok_or_else(|| anyhow!("channel not found"))?;
2307
2308 let mut update = proto::UpdateChannels::default();
2309 update.channel_invitations.push(proto::Channel {
2310 id: channel.id.to_proto(),
2311 name: channel.name,
2312 });
2313 for connection_id in session
2314 .connection_pool()
2315 .await
2316 .user_connection_ids(invitee_id)
2317 {
2318 session.peer.send(connection_id, update.clone())?;
2319 }
2320
2321 response.send(proto::Ack {})?;
2322 Ok(())
2323}
2324
2325async fn remove_channel_member(
2326 request: proto::RemoveChannelMember,
2327 response: Response<proto::RemoveChannelMember>,
2328 session: Session,
2329) -> Result<()> {
2330 let db = session.db().await;
2331 let channel_id = ChannelId::from_proto(request.channel_id);
2332 let member_id = UserId::from_proto(request.user_id);
2333
2334 db.remove_channel_member(channel_id, member_id, session.user_id)
2335 .await?;
2336
2337 let mut update = proto::UpdateChannels::default();
2338 update.delete_channels.push(channel_id.to_proto());
2339
2340 for connection_id in session
2341 .connection_pool()
2342 .await
2343 .user_connection_ids(member_id)
2344 {
2345 session.peer.send(connection_id, update.clone())?;
2346 }
2347
2348 response.send(proto::Ack {})?;
2349 Ok(())
2350}
2351
2352async fn set_channel_member_admin(
2353 request: proto::SetChannelMemberAdmin,
2354 response: Response<proto::SetChannelMemberAdmin>,
2355 session: Session,
2356) -> Result<()> {
2357 let db = session.db().await;
2358 let channel_id = ChannelId::from_proto(request.channel_id);
2359 let member_id = UserId::from_proto(request.user_id);
2360 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2361 .await?;
2362
2363 let (channel, has_accepted) = db
2364 .get_channel(channel_id, member_id)
2365 .await?
2366 .ok_or_else(|| anyhow!("channel not found"))?;
2367
2368 let mut update = proto::UpdateChannels::default();
2369 if has_accepted {
2370 update.channel_permissions.push(proto::ChannelPermission {
2371 channel_id: channel.id.to_proto(),
2372 is_admin: request.admin,
2373 });
2374 }
2375
2376 for connection_id in session
2377 .connection_pool()
2378 .await
2379 .user_connection_ids(member_id)
2380 {
2381 session.peer.send(connection_id, update.clone())?;
2382 }
2383
2384 response.send(proto::Ack {})?;
2385 Ok(())
2386}
2387
2388async fn rename_channel(
2389 request: proto::RenameChannel,
2390 response: Response<proto::RenameChannel>,
2391 session: Session,
2392) -> Result<()> {
2393 let db = session.db().await;
2394 let channel_id = ChannelId::from_proto(request.channel_id);
2395 let new_name = db
2396 .rename_channel(channel_id, session.user_id, &request.name)
2397 .await?;
2398
2399 let channel = proto::Channel {
2400 id: request.channel_id,
2401 name: new_name,
2402 };
2403 response.send(proto::RenameChannelResponse {
2404 channel: Some(channel.clone()),
2405 })?;
2406 let mut update = proto::UpdateChannels::default();
2407 update.channels.push(channel);
2408
2409 let member_ids = db.get_channel_members(channel_id).await?;
2410
2411 let connection_pool = session.connection_pool().await;
2412 for member_id in member_ids {
2413 for connection_id in connection_pool.user_connection_ids(member_id) {
2414 session.peer.send(connection_id, update.clone())?;
2415 }
2416 }
2417
2418 Ok(())
2419}
2420
2421async fn link_channel(
2422 request: proto::LinkChannel,
2423 response: Response<proto::LinkChannel>,
2424 session: Session,
2425) -> Result<()> {
2426 let db = session.db().await;
2427 let channel_id = ChannelId::from_proto(request.channel_id);
2428 let to = ChannelId::from_proto(request.to);
2429 let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2430
2431 let members = db.get_channel_members(to).await?;
2432 let connection_pool = session.connection_pool().await;
2433 let update = proto::UpdateChannels {
2434 channels: channels_to_send
2435 .channels
2436 .into_iter()
2437 .map(|channel| proto::Channel {
2438 id: channel.id.to_proto(),
2439 name: channel.name,
2440 })
2441 .collect(),
2442 insert_edge: channels_to_send.edges,
2443 ..Default::default()
2444 };
2445 for member_id in members {
2446 for connection_id in connection_pool.user_connection_ids(member_id) {
2447 session.peer.send(connection_id, update.clone())?;
2448 }
2449 }
2450
2451 response.send(Ack {})?;
2452
2453 Ok(())
2454}
2455
2456async fn unlink_channel(
2457 request: proto::UnlinkChannel,
2458 response: Response<proto::UnlinkChannel>,
2459 session: Session,
2460) -> Result<()> {
2461 let db = session.db().await;
2462 let channel_id = ChannelId::from_proto(request.channel_id);
2463 let from = ChannelId::from_proto(request.from);
2464
2465 db.unlink_channel(session.user_id, channel_id, from).await?;
2466
2467 let members = db.get_channel_members(from).await?;
2468
2469 let update = proto::UpdateChannels {
2470 delete_edge: vec![proto::ChannelEdge {
2471 channel_id: channel_id.to_proto(),
2472 parent_id: from.to_proto(),
2473 }],
2474 ..Default::default()
2475 };
2476 let connection_pool = session.connection_pool().await;
2477 for member_id in members {
2478 for connection_id in connection_pool.user_connection_ids(member_id) {
2479 session.peer.send(connection_id, update.clone())?;
2480 }
2481 }
2482
2483 response.send(Ack {})?;
2484
2485 Ok(())
2486}
2487
2488async fn move_channel(
2489 request: proto::MoveChannel,
2490 response: Response<proto::MoveChannel>,
2491 session: Session,
2492) -> Result<()> {
2493 let db = session.db().await;
2494 let channel_id = ChannelId::from_proto(request.channel_id);
2495 let from_parent = ChannelId::from_proto(request.from);
2496 let to = ChannelId::from_proto(request.to);
2497
2498 let channels_to_send = db
2499 .move_channel(session.user_id, channel_id, from_parent, to)
2500 .await?;
2501
2502 if channels_to_send.is_empty() {
2503 response.send(Ack {})?;
2504 return Ok(());
2505 }
2506
2507 let members_from = db.get_channel_members(from_parent).await?;
2508 let members_to = db.get_channel_members(to).await?;
2509
2510 let update = proto::UpdateChannels {
2511 delete_edge: vec![proto::ChannelEdge {
2512 channel_id: channel_id.to_proto(),
2513 parent_id: from_parent.to_proto(),
2514 }],
2515 ..Default::default()
2516 };
2517 let connection_pool = session.connection_pool().await;
2518 for member_id in members_from {
2519 for connection_id in connection_pool.user_connection_ids(member_id) {
2520 session.peer.send(connection_id, update.clone())?;
2521 }
2522 }
2523
2524 let update = proto::UpdateChannels {
2525 channels: channels_to_send
2526 .channels
2527 .into_iter()
2528 .map(|channel| proto::Channel {
2529 id: channel.id.to_proto(),
2530 name: channel.name,
2531 })
2532 .collect(),
2533 insert_edge: channels_to_send.edges,
2534 ..Default::default()
2535 };
2536 for member_id in members_to {
2537 for connection_id in connection_pool.user_connection_ids(member_id) {
2538 session.peer.send(connection_id, update.clone())?;
2539 }
2540 }
2541
2542 response.send(Ack {})?;
2543
2544 Ok(())
2545}
2546
2547async fn get_channel_members(
2548 request: proto::GetChannelMembers,
2549 response: Response<proto::GetChannelMembers>,
2550 session: Session,
2551) -> Result<()> {
2552 let db = session.db().await;
2553 let channel_id = ChannelId::from_proto(request.channel_id);
2554 let members = db
2555 .get_channel_member_details(channel_id, session.user_id)
2556 .await?;
2557 response.send(proto::GetChannelMembersResponse { members })?;
2558 Ok(())
2559}
2560
2561async fn respond_to_channel_invite(
2562 request: proto::RespondToChannelInvite,
2563 response: Response<proto::RespondToChannelInvite>,
2564 session: Session,
2565) -> Result<()> {
2566 let db = session.db().await;
2567 let channel_id = ChannelId::from_proto(request.channel_id);
2568 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2569 .await?;
2570
2571 let mut update = proto::UpdateChannels::default();
2572 update
2573 .remove_channel_invitations
2574 .push(channel_id.to_proto());
2575 if request.accept {
2576 let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2577 update
2578 .channels
2579 .extend(
2580 result
2581 .channels
2582 .channels
2583 .into_iter()
2584 .map(|channel| proto::Channel {
2585 id: channel.id.to_proto(),
2586 name: channel.name,
2587 }),
2588 );
2589 update.unseen_channel_messages = result.channel_messages;
2590 update.unseen_channel_buffer_changes = result.unseen_buffer_changes;
2591 update.insert_edge = result.channels.edges;
2592 update
2593 .channel_participants
2594 .extend(
2595 result
2596 .channel_participants
2597 .into_iter()
2598 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2599 channel_id: channel_id.to_proto(),
2600 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2601 }),
2602 );
2603 update
2604 .channel_permissions
2605 .extend(
2606 result
2607 .channels_with_admin_privileges
2608 .into_iter()
2609 .map(|channel_id| proto::ChannelPermission {
2610 channel_id: channel_id.to_proto(),
2611 is_admin: true,
2612 }),
2613 );
2614 }
2615 session.peer.send(session.connection_id, update)?;
2616 response.send(proto::Ack {})?;
2617
2618 Ok(())
2619}
2620
2621async fn join_channel(
2622 request: proto::JoinChannel,
2623 response: Response<proto::JoinChannel>,
2624 session: Session,
2625) -> Result<()> {
2626 let channel_id = ChannelId::from_proto(request.channel_id);
2627 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2628
2629 let joined_room = {
2630 leave_room_for_session(&session).await?;
2631 let db = session.db().await;
2632
2633 let room_id = db
2634 .get_or_create_channel_room(channel_id, &live_kit_room, &*RELEASE_CHANNEL_NAME)
2635 .await?;
2636
2637 let joined_room = db
2638 .join_room(
2639 room_id,
2640 session.user_id,
2641 session.connection_id,
2642 RELEASE_CHANNEL_NAME.as_str(),
2643 )
2644 .await?;
2645
2646 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2647 let token = live_kit
2648 .room_token(
2649 &joined_room.room.live_kit_room,
2650 &session.user_id.to_string(),
2651 )
2652 .trace_err()?;
2653
2654 Some(LiveKitConnectionInfo {
2655 server_url: live_kit.url().into(),
2656 token,
2657 })
2658 });
2659
2660 response.send(proto::JoinRoomResponse {
2661 room: Some(joined_room.room.clone()),
2662 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2663 live_kit_connection_info,
2664 })?;
2665
2666 room_updated(&joined_room.room, &session.peer);
2667
2668 joined_room.into_inner()
2669 };
2670
2671 channel_updated(
2672 channel_id,
2673 &joined_room.room,
2674 &joined_room.channel_members,
2675 &session.peer,
2676 &*session.connection_pool().await,
2677 );
2678
2679 update_user_contacts(session.user_id, &session).await?;
2680
2681 Ok(())
2682}
2683
2684async fn join_channel_buffer(
2685 request: proto::JoinChannelBuffer,
2686 response: Response<proto::JoinChannelBuffer>,
2687 session: Session,
2688) -> Result<()> {
2689 let db = session.db().await;
2690 let channel_id = ChannelId::from_proto(request.channel_id);
2691
2692 let open_response = db
2693 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2694 .await?;
2695
2696 let collaborators = open_response.collaborators.clone();
2697 response.send(open_response)?;
2698
2699 let update = UpdateChannelBufferCollaborators {
2700 channel_id: channel_id.to_proto(),
2701 collaborators: collaborators.clone(),
2702 };
2703 channel_buffer_updated(
2704 session.connection_id,
2705 collaborators
2706 .iter()
2707 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2708 &update,
2709 &session.peer,
2710 );
2711
2712 Ok(())
2713}
2714
2715async fn update_channel_buffer(
2716 request: proto::UpdateChannelBuffer,
2717 session: Session,
2718) -> Result<()> {
2719 let db = session.db().await;
2720 let channel_id = ChannelId::from_proto(request.channel_id);
2721
2722 let (collaborators, non_collaborators, epoch, version) = db
2723 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2724 .await?;
2725
2726 channel_buffer_updated(
2727 session.connection_id,
2728 collaborators,
2729 &proto::UpdateChannelBuffer {
2730 channel_id: channel_id.to_proto(),
2731 operations: request.operations,
2732 },
2733 &session.peer,
2734 );
2735
2736 let pool = &*session.connection_pool().await;
2737
2738 broadcast(
2739 None,
2740 non_collaborators
2741 .iter()
2742 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2743 |peer_id| {
2744 session.peer.send(
2745 peer_id.into(),
2746 proto::UpdateChannels {
2747 unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange {
2748 channel_id: channel_id.to_proto(),
2749 epoch: epoch as u64,
2750 version: version.clone(),
2751 }],
2752 ..Default::default()
2753 },
2754 )
2755 },
2756 );
2757
2758 Ok(())
2759}
2760
2761async fn rejoin_channel_buffers(
2762 request: proto::RejoinChannelBuffers,
2763 response: Response<proto::RejoinChannelBuffers>,
2764 session: Session,
2765) -> Result<()> {
2766 let db = session.db().await;
2767 let buffers = db
2768 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2769 .await?;
2770
2771 for rejoined_buffer in &buffers {
2772 let collaborators_to_notify = rejoined_buffer
2773 .buffer
2774 .collaborators
2775 .iter()
2776 .filter_map(|c| Some(c.peer_id?.into()));
2777 channel_buffer_updated(
2778 session.connection_id,
2779 collaborators_to_notify,
2780 &proto::UpdateChannelBufferCollaborators {
2781 channel_id: rejoined_buffer.buffer.channel_id,
2782 collaborators: rejoined_buffer.buffer.collaborators.clone(),
2783 },
2784 &session.peer,
2785 );
2786 }
2787
2788 response.send(proto::RejoinChannelBuffersResponse {
2789 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2790 })?;
2791
2792 Ok(())
2793}
2794
2795async fn leave_channel_buffer(
2796 request: proto::LeaveChannelBuffer,
2797 response: Response<proto::LeaveChannelBuffer>,
2798 session: Session,
2799) -> Result<()> {
2800 let db = session.db().await;
2801 let channel_id = ChannelId::from_proto(request.channel_id);
2802
2803 let left_buffer = db
2804 .leave_channel_buffer(channel_id, session.connection_id)
2805 .await?;
2806
2807 response.send(Ack {})?;
2808
2809 channel_buffer_updated(
2810 session.connection_id,
2811 left_buffer.connections,
2812 &proto::UpdateChannelBufferCollaborators {
2813 channel_id: channel_id.to_proto(),
2814 collaborators: left_buffer.collaborators,
2815 },
2816 &session.peer,
2817 );
2818
2819 Ok(())
2820}
2821
2822fn channel_buffer_updated<T: EnvelopedMessage>(
2823 sender_id: ConnectionId,
2824 collaborators: impl IntoIterator<Item = ConnectionId>,
2825 message: &T,
2826 peer: &Peer,
2827) {
2828 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2829 peer.send(peer_id.into(), message.clone())
2830 });
2831}
2832
2833async fn send_channel_message(
2834 request: proto::SendChannelMessage,
2835 response: Response<proto::SendChannelMessage>,
2836 session: Session,
2837) -> Result<()> {
2838 // Validate the message body.
2839 let body = request.body.trim().to_string();
2840 if body.len() > MAX_MESSAGE_LEN {
2841 return Err(anyhow!("message is too long"))?;
2842 }
2843 if body.is_empty() {
2844 return Err(anyhow!("message can't be blank"))?;
2845 }
2846
2847 let timestamp = OffsetDateTime::now_utc();
2848 let nonce = request
2849 .nonce
2850 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2851
2852 let channel_id = ChannelId::from_proto(request.channel_id);
2853 let (message_id, connection_ids, non_participants) = session
2854 .db()
2855 .await
2856 .create_channel_message(
2857 channel_id,
2858 session.user_id,
2859 &body,
2860 timestamp,
2861 nonce.clone().into(),
2862 )
2863 .await?;
2864 let message = proto::ChannelMessage {
2865 sender_id: session.user_id.to_proto(),
2866 id: message_id.to_proto(),
2867 body,
2868 timestamp: timestamp.unix_timestamp() as u64,
2869 nonce: Some(nonce),
2870 };
2871 broadcast(Some(session.connection_id), connection_ids, |connection| {
2872 session.peer.send(
2873 connection,
2874 proto::ChannelMessageSent {
2875 channel_id: channel_id.to_proto(),
2876 message: Some(message.clone()),
2877 },
2878 )
2879 });
2880 response.send(proto::SendChannelMessageResponse {
2881 message: Some(message),
2882 })?;
2883
2884 let pool = &*session.connection_pool().await;
2885 broadcast(
2886 None,
2887 non_participants
2888 .iter()
2889 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2890 |peer_id| {
2891 session.peer.send(
2892 peer_id.into(),
2893 proto::UpdateChannels {
2894 unseen_channel_messages: vec![proto::UnseenChannelMessage {
2895 channel_id: channel_id.to_proto(),
2896 message_id: message_id.to_proto(),
2897 }],
2898 ..Default::default()
2899 },
2900 )
2901 },
2902 );
2903
2904 Ok(())
2905}
2906
2907async fn remove_channel_message(
2908 request: proto::RemoveChannelMessage,
2909 response: Response<proto::RemoveChannelMessage>,
2910 session: Session,
2911) -> Result<()> {
2912 let channel_id = ChannelId::from_proto(request.channel_id);
2913 let message_id = MessageId::from_proto(request.message_id);
2914 let connection_ids = session
2915 .db()
2916 .await
2917 .remove_channel_message(channel_id, message_id, session.user_id)
2918 .await?;
2919 broadcast(Some(session.connection_id), connection_ids, |connection| {
2920 session.peer.send(connection, request.clone())
2921 });
2922 response.send(proto::Ack {})?;
2923 Ok(())
2924}
2925
2926async fn acknowledge_channel_message(
2927 request: proto::AckChannelMessage,
2928 session: Session,
2929) -> Result<()> {
2930 let channel_id = ChannelId::from_proto(request.channel_id);
2931 let message_id = MessageId::from_proto(request.message_id);
2932 session
2933 .db()
2934 .await
2935 .observe_channel_message(channel_id, session.user_id, message_id)
2936 .await?;
2937 Ok(())
2938}
2939
2940async fn acknowledge_buffer_version(
2941 request: proto::AckBufferOperation,
2942 session: Session,
2943) -> Result<()> {
2944 let buffer_id = BufferId::from_proto(request.buffer_id);
2945 session
2946 .db()
2947 .await
2948 .observe_buffer_version(
2949 buffer_id,
2950 session.user_id,
2951 request.epoch as i32,
2952 &request.version,
2953 )
2954 .await?;
2955 Ok(())
2956}
2957
2958async fn join_channel_chat(
2959 request: proto::JoinChannelChat,
2960 response: Response<proto::JoinChannelChat>,
2961 session: Session,
2962) -> Result<()> {
2963 let channel_id = ChannelId::from_proto(request.channel_id);
2964
2965 let db = session.db().await;
2966 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2967 .await?;
2968 let messages = db
2969 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2970 .await?;
2971 response.send(proto::JoinChannelChatResponse {
2972 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2973 messages,
2974 })?;
2975 Ok(())
2976}
2977
2978async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2979 let channel_id = ChannelId::from_proto(request.channel_id);
2980 session
2981 .db()
2982 .await
2983 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2984 .await?;
2985 Ok(())
2986}
2987
2988async fn get_channel_messages(
2989 request: proto::GetChannelMessages,
2990 response: Response<proto::GetChannelMessages>,
2991 session: Session,
2992) -> Result<()> {
2993 let channel_id = ChannelId::from_proto(request.channel_id);
2994 let messages = session
2995 .db()
2996 .await
2997 .get_channel_messages(
2998 channel_id,
2999 session.user_id,
3000 MESSAGE_COUNT_PER_PAGE,
3001 Some(MessageId::from_proto(request.before_message_id)),
3002 )
3003 .await?;
3004 response.send(proto::GetChannelMessagesResponse {
3005 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
3006 messages,
3007 })?;
3008 Ok(())
3009}
3010
3011async fn get_notifications(
3012 request: proto::GetNotifications,
3013 response: Response<proto::GetNotifications>,
3014 session: Session,
3015) -> Result<()> {
3016 let notifications = session
3017 .db()
3018 .await
3019 .get_notifications(
3020 session.user_id,
3021 NOTIFICATION_COUNT_PER_PAGE,
3022 request
3023 .before_id
3024 .map(|id| db::NotificationId::from_proto(id)),
3025 )
3026 .await?;
3027 response.send(proto::GetNotificationsResponse { notifications })?;
3028 Ok(())
3029}
3030
3031async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
3032 let project_id = ProjectId::from_proto(request.project_id);
3033 let project_connection_ids = session
3034 .db()
3035 .await
3036 .project_connection_ids(project_id, session.connection_id)
3037 .await?;
3038 broadcast(
3039 Some(session.connection_id),
3040 project_connection_ids.iter().copied(),
3041 |connection_id| {
3042 session
3043 .peer
3044 .forward_send(session.connection_id, connection_id, request.clone())
3045 },
3046 );
3047 Ok(())
3048}
3049
3050async fn get_private_user_info(
3051 _request: proto::GetPrivateUserInfo,
3052 response: Response<proto::GetPrivateUserInfo>,
3053 session: Session,
3054) -> Result<()> {
3055 let db = session.db().await;
3056
3057 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
3058 let user = db
3059 .get_user_by_id(session.user_id)
3060 .await?
3061 .ok_or_else(|| anyhow!("user not found"))?;
3062 let flags = db.get_user_flags(session.user_id).await?;
3063
3064 response.send(proto::GetPrivateUserInfoResponse {
3065 metrics_id,
3066 staff: user.admin,
3067 flags,
3068 })?;
3069 Ok(())
3070}
3071
3072fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
3073 match message {
3074 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
3075 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
3076 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
3077 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
3078 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
3079 code: frame.code.into(),
3080 reason: frame.reason,
3081 })),
3082 }
3083}
3084
3085fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
3086 match message {
3087 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
3088 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
3089 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
3090 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
3091 AxumMessage::Close(frame) => {
3092 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
3093 code: frame.code.into(),
3094 reason: frame.reason,
3095 }))
3096 }
3097 }
3098}
3099
3100fn build_initial_channels_update(
3101 channels: ChannelsForUser,
3102 channel_invites: Vec<db::Channel>,
3103) -> proto::UpdateChannels {
3104 let mut update = proto::UpdateChannels::default();
3105
3106 for channel in channels.channels.channels {
3107 update.channels.push(proto::Channel {
3108 id: channel.id.to_proto(),
3109 name: channel.name,
3110 });
3111 }
3112
3113 update.unseen_channel_buffer_changes = channels.unseen_buffer_changes;
3114 update.unseen_channel_messages = channels.channel_messages;
3115 update.insert_edge = channels.channels.edges;
3116
3117 for (channel_id, participants) in channels.channel_participants {
3118 update
3119 .channel_participants
3120 .push(proto::ChannelParticipants {
3121 channel_id: channel_id.to_proto(),
3122 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
3123 });
3124 }
3125
3126 update
3127 .channel_permissions
3128 .extend(
3129 channels
3130 .channels_with_admin_privileges
3131 .into_iter()
3132 .map(|id| proto::ChannelPermission {
3133 channel_id: id.to_proto(),
3134 is_admin: true,
3135 }),
3136 );
3137
3138 for channel in channel_invites {
3139 update.channel_invitations.push(proto::Channel {
3140 id: channel.id.to_proto(),
3141 name: channel.name,
3142 });
3143 }
3144
3145 update
3146}
3147
3148fn build_initial_contacts_update(
3149 contacts: Vec<db::Contact>,
3150 pool: &ConnectionPool,
3151) -> proto::UpdateContacts {
3152 let mut update = proto::UpdateContacts::default();
3153
3154 for contact in contacts {
3155 match contact {
3156 db::Contact::Accepted { user_id, busy } => {
3157 update.contacts.push(contact_for_user(user_id, busy, &pool));
3158 }
3159 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3160 db::Contact::Incoming { user_id } => {
3161 update
3162 .incoming_requests
3163 .push(proto::IncomingContactRequest {
3164 requester_id: user_id.to_proto(),
3165 })
3166 }
3167 }
3168 }
3169
3170 update
3171}
3172
3173fn contact_for_user(user_id: UserId, busy: bool, pool: &ConnectionPool) -> proto::Contact {
3174 proto::Contact {
3175 user_id: user_id.to_proto(),
3176 online: pool.is_user_online(user_id),
3177 busy,
3178 }
3179}
3180
3181fn room_updated(room: &proto::Room, peer: &Peer) {
3182 broadcast(
3183 None,
3184 room.participants
3185 .iter()
3186 .filter_map(|participant| Some(participant.peer_id?.into())),
3187 |peer_id| {
3188 peer.send(
3189 peer_id.into(),
3190 proto::RoomUpdated {
3191 room: Some(room.clone()),
3192 },
3193 )
3194 },
3195 );
3196}
3197
3198fn channel_updated(
3199 channel_id: ChannelId,
3200 room: &proto::Room,
3201 channel_members: &[UserId],
3202 peer: &Peer,
3203 pool: &ConnectionPool,
3204) {
3205 let participants = room
3206 .participants
3207 .iter()
3208 .map(|p| p.user_id)
3209 .collect::<Vec<_>>();
3210
3211 broadcast(
3212 None,
3213 channel_members
3214 .iter()
3215 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3216 |peer_id| {
3217 peer.send(
3218 peer_id.into(),
3219 proto::UpdateChannels {
3220 channel_participants: vec![proto::ChannelParticipants {
3221 channel_id: channel_id.to_proto(),
3222 participant_user_ids: participants.clone(),
3223 }],
3224 ..Default::default()
3225 },
3226 )
3227 },
3228 );
3229}
3230
3231async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3232 let db = session.db().await;
3233
3234 let contacts = db.get_contacts(user_id).await?;
3235 let busy = db.is_user_busy(user_id).await?;
3236
3237 let pool = session.connection_pool().await;
3238 let updated_contact = contact_for_user(user_id, busy, &pool);
3239 for contact in contacts {
3240 if let db::Contact::Accepted {
3241 user_id: contact_user_id,
3242 ..
3243 } = contact
3244 {
3245 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3246 session
3247 .peer
3248 .send(
3249 contact_conn_id,
3250 proto::UpdateContacts {
3251 contacts: vec![updated_contact.clone()],
3252 remove_contacts: Default::default(),
3253 incoming_requests: Default::default(),
3254 remove_incoming_requests: Default::default(),
3255 outgoing_requests: Default::default(),
3256 remove_outgoing_requests: Default::default(),
3257 },
3258 )
3259 .trace_err();
3260 }
3261 }
3262 }
3263 Ok(())
3264}
3265
3266async fn leave_room_for_session(session: &Session) -> Result<()> {
3267 let mut contacts_to_update = HashSet::default();
3268
3269 let room_id;
3270 let canceled_calls_to_user_ids;
3271 let live_kit_room;
3272 let delete_live_kit_room;
3273 let room;
3274 let channel_members;
3275 let channel_id;
3276
3277 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3278 contacts_to_update.insert(session.user_id);
3279
3280 for project in left_room.left_projects.values() {
3281 project_left(project, session);
3282 }
3283
3284 room_id = RoomId::from_proto(left_room.room.id);
3285 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3286 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3287 delete_live_kit_room = left_room.deleted;
3288 room = mem::take(&mut left_room.room);
3289 channel_members = mem::take(&mut left_room.channel_members);
3290 channel_id = left_room.channel_id;
3291
3292 room_updated(&room, &session.peer);
3293 } else {
3294 return Ok(());
3295 }
3296
3297 if let Some(channel_id) = channel_id {
3298 channel_updated(
3299 channel_id,
3300 &room,
3301 &channel_members,
3302 &session.peer,
3303 &*session.connection_pool().await,
3304 );
3305 }
3306
3307 {
3308 let pool = session.connection_pool().await;
3309 for canceled_user_id in canceled_calls_to_user_ids {
3310 for connection_id in pool.user_connection_ids(canceled_user_id) {
3311 session
3312 .peer
3313 .send(
3314 connection_id,
3315 proto::CallCanceled {
3316 room_id: room_id.to_proto(),
3317 },
3318 )
3319 .trace_err();
3320 }
3321 contacts_to_update.insert(canceled_user_id);
3322 }
3323 }
3324
3325 for contact_user_id in contacts_to_update {
3326 update_user_contacts(contact_user_id, &session).await?;
3327 }
3328
3329 if let Some(live_kit) = session.live_kit_client.as_ref() {
3330 live_kit
3331 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3332 .await
3333 .trace_err();
3334
3335 if delete_live_kit_room {
3336 live_kit.delete_room(live_kit_room).await.trace_err();
3337 }
3338 }
3339
3340 Ok(())
3341}
3342
3343async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3344 let left_channel_buffers = session
3345 .db()
3346 .await
3347 .leave_channel_buffers(session.connection_id)
3348 .await?;
3349
3350 for left_buffer in left_channel_buffers {
3351 channel_buffer_updated(
3352 session.connection_id,
3353 left_buffer.connections,
3354 &proto::UpdateChannelBufferCollaborators {
3355 channel_id: left_buffer.channel_id.to_proto(),
3356 collaborators: left_buffer.collaborators,
3357 },
3358 &session.peer,
3359 );
3360 }
3361
3362 Ok(())
3363}
3364
3365fn project_left(project: &db::LeftProject, session: &Session) {
3366 for connection_id in &project.connection_ids {
3367 if project.host_user_id == session.user_id {
3368 session
3369 .peer
3370 .send(
3371 *connection_id,
3372 proto::UnshareProject {
3373 project_id: project.id.to_proto(),
3374 },
3375 )
3376 .trace_err();
3377 } else {
3378 session
3379 .peer
3380 .send(
3381 *connection_id,
3382 proto::RemoveProjectCollaborator {
3383 project_id: project.id.to_proto(),
3384 peer_id: Some(session.connection_id.into()),
3385 },
3386 )
3387 .trace_err();
3388 }
3389 }
3390}
3391
3392pub trait ResultExt {
3393 type Ok;
3394
3395 fn trace_err(self) -> Option<Self::Ok>;
3396}
3397
3398impl<T, E> ResultExt for Result<T, E>
3399where
3400 E: std::fmt::Debug,
3401{
3402 type Ok = T;
3403
3404 fn trace_err(self) -> Option<T> {
3405 match self {
3406 Ok(value) => Some(value),
3407 Err(error) => {
3408 tracing::error!("{:?}", error);
3409 None
3410 }
3411 }
3412 }
3413}