1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, BufferId, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId,
7 ServerId, User, UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AnyTypedEnvelope, ChannelEdge, ChannelVisibility, EntityMessage,
42 EnvelopedMessage, LiveKitConnectionInfo, RequestMessage, UpdateChannelBufferCollaborators,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66use util::channel::RELEASE_CHANNEL_NAME;
67
68pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
69pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74lazy_static! {
75 static ref METRIC_CONNECTIONS: IntGauge =
76 register_int_gauge!("connections", "number of connections").unwrap();
77 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
78 "shared_projects",
79 "number of open projects with one or more guests"
80 )
81 .unwrap();
82}
83
84type MessageHandler =
85 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
86
87struct Response<R> {
88 peer: Arc<Peer>,
89 receipt: Receipt<R>,
90 responded: Arc<AtomicBool>,
91}
92
93impl<R: RequestMessage> Response<R> {
94 fn send(self, payload: R::Response) -> Result<()> {
95 self.responded.store(true, SeqCst);
96 self.peer.respond(self.receipt, payload)?;
97 Ok(())
98 }
99}
100
101#[derive(Clone)]
102struct Session {
103 user_id: UserId,
104 connection_id: ConnectionId,
105 db: Arc<tokio::sync::Mutex<DbHandle>>,
106 peer: Arc<Peer>,
107 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
108 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
109 executor: Executor,
110}
111
112impl Session {
113 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
114 #[cfg(test)]
115 tokio::task::yield_now().await;
116 let guard = self.db.lock().await;
117 #[cfg(test)]
118 tokio::task::yield_now().await;
119 guard
120 }
121
122 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
123 #[cfg(test)]
124 tokio::task::yield_now().await;
125 let guard = self.connection_pool.lock();
126 ConnectionPoolGuard {
127 guard,
128 _not_send: PhantomData,
129 }
130 }
131}
132
133impl fmt::Debug for Session {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("Session")
136 .field("user_id", &self.user_id)
137 .field("connection_id", &self.connection_id)
138 .finish()
139 }
140}
141
142struct DbHandle(Arc<Database>);
143
144impl Deref for DbHandle {
145 type Target = Database;
146
147 fn deref(&self) -> &Self::Target {
148 self.0.as_ref()
149 }
150}
151
152pub struct Server {
153 id: parking_lot::Mutex<ServerId>,
154 peer: Arc<Peer>,
155 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
156 app_state: Arc<AppState>,
157 executor: Executor,
158 handlers: HashMap<TypeId, MessageHandler>,
159 teardown: watch::Sender<()>,
160}
161
162pub(crate) struct ConnectionPoolGuard<'a> {
163 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
164 _not_send: PhantomData<Rc<()>>,
165}
166
167#[derive(Serialize)]
168pub struct ServerSnapshot<'a> {
169 peer: &'a Peer,
170 #[serde(serialize_with = "serialize_deref")]
171 connection_pool: ConnectionPoolGuard<'a>,
172}
173
174pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
175where
176 S: Serializer,
177 T: Deref<Target = U>,
178 U: Serialize,
179{
180 Serialize::serialize(value.deref(), serializer)
181}
182
183impl Server {
184 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
185 let mut server = Self {
186 id: parking_lot::Mutex::new(id),
187 peer: Peer::new(id.0 as u32),
188 app_state,
189 executor,
190 connection_pool: Default::default(),
191 handlers: Default::default(),
192 teardown: watch::channel(()).0,
193 };
194
195 server
196 .add_request_handler(ping)
197 .add_request_handler(create_room)
198 .add_request_handler(join_room)
199 .add_request_handler(rejoin_room)
200 .add_request_handler(leave_room)
201 .add_request_handler(call)
202 .add_request_handler(cancel_call)
203 .add_message_handler(decline_call)
204 .add_request_handler(update_participant_location)
205 .add_request_handler(share_project)
206 .add_message_handler(unshare_project)
207 .add_request_handler(join_project)
208 .add_message_handler(leave_project)
209 .add_request_handler(update_project)
210 .add_request_handler(update_worktree)
211 .add_message_handler(start_language_server)
212 .add_message_handler(update_language_server)
213 .add_message_handler(update_diagnostic_summary)
214 .add_message_handler(update_worktree_settings)
215 .add_message_handler(refresh_inlay_hints)
216 .add_request_handler(forward_project_request::<proto::GetHover>)
217 .add_request_handler(forward_project_request::<proto::GetDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
219 .add_request_handler(forward_project_request::<proto::GetReferences>)
220 .add_request_handler(forward_project_request::<proto::SearchProject>)
221 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
222 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
225 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
226 .add_request_handler(forward_project_request::<proto::GetCompletions>)
227 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
228 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
229 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
230 .add_request_handler(forward_project_request::<proto::PrepareRename>)
231 .add_request_handler(forward_project_request::<proto::PerformRename>)
232 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
233 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
234 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
235 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
240 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
241 .add_request_handler(forward_project_request::<proto::InlayHints>)
242 .add_message_handler(create_buffer_for_peer)
243 .add_request_handler(update_buffer)
244 .add_message_handler(update_buffer_file)
245 .add_message_handler(buffer_reloaded)
246 .add_message_handler(buffer_saved)
247 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
248 .add_request_handler(get_users)
249 .add_request_handler(fuzzy_search_users)
250 .add_request_handler(request_contact)
251 .add_request_handler(remove_contact)
252 .add_request_handler(respond_to_contact_request)
253 .add_request_handler(create_channel)
254 .add_request_handler(delete_channel)
255 .add_request_handler(invite_channel_member)
256 .add_request_handler(remove_channel_member)
257 .add_request_handler(set_channel_member_role)
258 .add_request_handler(rename_channel)
259 .add_request_handler(join_channel_buffer)
260 .add_request_handler(leave_channel_buffer)
261 .add_message_handler(update_channel_buffer)
262 .add_request_handler(rejoin_channel_buffers)
263 .add_request_handler(get_channel_members)
264 .add_request_handler(respond_to_channel_invite)
265 .add_request_handler(join_channel)
266 .add_request_handler(join_channel_chat)
267 .add_message_handler(leave_channel_chat)
268 .add_request_handler(send_channel_message)
269 .add_request_handler(remove_channel_message)
270 .add_request_handler(get_channel_messages)
271 .add_request_handler(link_channel)
272 .add_request_handler(unlink_channel)
273 .add_request_handler(move_channel)
274 .add_request_handler(follow)
275 .add_message_handler(unfollow)
276 .add_message_handler(update_followers)
277 .add_message_handler(update_diff_base)
278 .add_request_handler(get_private_user_info)
279 .add_message_handler(acknowledge_channel_message)
280 .add_message_handler(acknowledge_buffer_version);
281
282 Arc::new(server)
283 }
284
285 pub async fn start(&self) -> Result<()> {
286 let server_id = *self.id.lock();
287 let app_state = self.app_state.clone();
288 let peer = self.peer.clone();
289 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
290 let pool = self.connection_pool.clone();
291 let live_kit_client = self.app_state.live_kit_client.clone();
292
293 let span = info_span!("start server");
294 self.executor.spawn_detached(
295 async move {
296 tracing::info!("waiting for cleanup timeout");
297 timeout.await;
298 tracing::info!("cleanup timeout expired, retrieving stale rooms");
299 if let Some((room_ids, channel_ids)) = app_state
300 .db
301 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
302 .await
303 .trace_err()
304 {
305 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
306 tracing::info!(
307 stale_channel_buffer_count = channel_ids.len(),
308 "retrieved stale channel buffers"
309 );
310
311 for channel_id in channel_ids {
312 if let Some(refreshed_channel_buffer) = app_state
313 .db
314 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
315 .await
316 .trace_err()
317 {
318 for connection_id in refreshed_channel_buffer.connection_ids {
319 peer.send(
320 connection_id,
321 proto::UpdateChannelBufferCollaborators {
322 channel_id: channel_id.to_proto(),
323 collaborators: refreshed_channel_buffer
324 .collaborators
325 .clone(),
326 },
327 )
328 .trace_err();
329 }
330 }
331 }
332
333 for room_id in room_ids {
334 let mut contacts_to_update = HashSet::default();
335 let mut canceled_calls_to_user_ids = Vec::new();
336 let mut live_kit_room = String::new();
337 let mut delete_live_kit_room = false;
338
339 if let Some(mut refreshed_room) = app_state
340 .db
341 .clear_stale_room_participants(room_id, server_id)
342 .await
343 .trace_err()
344 {
345 tracing::info!(
346 room_id = room_id.0,
347 new_participant_count = refreshed_room.room.participants.len(),
348 "refreshed room"
349 );
350 room_updated(&refreshed_room.room, &peer);
351 if let Some(channel_id) = refreshed_room.channel_id {
352 channel_updated(
353 channel_id,
354 &refreshed_room.room,
355 &refreshed_room.channel_members,
356 &peer,
357 &*pool.lock(),
358 );
359 }
360 contacts_to_update
361 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
362 contacts_to_update
363 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
364 canceled_calls_to_user_ids =
365 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
366 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
367 delete_live_kit_room = refreshed_room.room.participants.is_empty();
368 }
369
370 {
371 let pool = pool.lock();
372 for canceled_user_id in canceled_calls_to_user_ids {
373 for connection_id in pool.user_connection_ids(canceled_user_id) {
374 peer.send(
375 connection_id,
376 proto::CallCanceled {
377 room_id: room_id.to_proto(),
378 },
379 )
380 .trace_err();
381 }
382 }
383 }
384
385 for user_id in contacts_to_update {
386 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
387 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
388 if let Some((busy, contacts)) = busy.zip(contacts) {
389 let pool = pool.lock();
390 let updated_contact = contact_for_user(user_id, false, busy, &pool);
391 for contact in contacts {
392 if let db::Contact::Accepted {
393 user_id: contact_user_id,
394 ..
395 } = contact
396 {
397 for contact_conn_id in
398 pool.user_connection_ids(contact_user_id)
399 {
400 peer.send(
401 contact_conn_id,
402 proto::UpdateContacts {
403 contacts: vec![updated_contact.clone()],
404 remove_contacts: Default::default(),
405 incoming_requests: Default::default(),
406 remove_incoming_requests: Default::default(),
407 outgoing_requests: Default::default(),
408 remove_outgoing_requests: Default::default(),
409 },
410 )
411 .trace_err();
412 }
413 }
414 }
415 }
416 }
417
418 if let Some(live_kit) = live_kit_client.as_ref() {
419 if delete_live_kit_room {
420 live_kit.delete_room(live_kit_room).await.trace_err();
421 }
422 }
423 }
424 }
425
426 app_state
427 .db
428 .delete_stale_servers(&app_state.config.zed_environment, server_id)
429 .await
430 .trace_err();
431 }
432 .instrument(span),
433 );
434 Ok(())
435 }
436
437 pub fn teardown(&self) {
438 self.peer.teardown();
439 self.connection_pool.lock().reset();
440 let _ = self.teardown.send(());
441 }
442
443 #[cfg(test)]
444 pub fn reset(&self, id: ServerId) {
445 self.teardown();
446 *self.id.lock() = id;
447 self.peer.reset(id.0 as u32);
448 }
449
450 #[cfg(test)]
451 pub fn id(&self) -> ServerId {
452 *self.id.lock()
453 }
454
455 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
456 where
457 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
458 Fut: 'static + Send + Future<Output = Result<()>>,
459 M: EnvelopedMessage,
460 {
461 let prev_handler = self.handlers.insert(
462 TypeId::of::<M>(),
463 Box::new(move |envelope, session| {
464 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
465 let span = info_span!(
466 "handle message",
467 payload_type = envelope.payload_type_name()
468 );
469 span.in_scope(|| {
470 tracing::info!(
471 payload_type = envelope.payload_type_name(),
472 "message received"
473 );
474 });
475 let start_time = Instant::now();
476 let future = (handler)(*envelope, session);
477 async move {
478 let result = future.await;
479 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
480 match result {
481 Err(error) => {
482 tracing::error!(%error, ?duration_ms, "error handling message")
483 }
484 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
485 }
486 }
487 .instrument(span)
488 .boxed()
489 }),
490 );
491 if prev_handler.is_some() {
492 panic!("registered a handler for the same message twice");
493 }
494 self
495 }
496
497 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
498 where
499 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
500 Fut: 'static + Send + Future<Output = Result<()>>,
501 M: EnvelopedMessage,
502 {
503 self.add_handler(move |envelope, session| handler(envelope.payload, session));
504 self
505 }
506
507 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
508 where
509 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
510 Fut: Send + Future<Output = Result<()>>,
511 M: RequestMessage,
512 {
513 let handler = Arc::new(handler);
514 self.add_handler(move |envelope, session| {
515 let receipt = envelope.receipt();
516 let handler = handler.clone();
517 async move {
518 let peer = session.peer.clone();
519 let responded = Arc::new(AtomicBool::default());
520 let response = Response {
521 peer: peer.clone(),
522 responded: responded.clone(),
523 receipt,
524 };
525 match (handler)(envelope.payload, response, session).await {
526 Ok(()) => {
527 if responded.load(std::sync::atomic::Ordering::SeqCst) {
528 Ok(())
529 } else {
530 Err(anyhow!("handler did not send a response"))?
531 }
532 }
533 Err(error) => {
534 peer.respond_with_error(
535 receipt,
536 proto::Error {
537 message: error.to_string(),
538 },
539 )?;
540 Err(error)
541 }
542 }
543 }
544 })
545 }
546
547 pub fn handle_connection(
548 self: &Arc<Self>,
549 connection: Connection,
550 address: String,
551 user: User,
552 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
553 executor: Executor,
554 ) -> impl Future<Output = Result<()>> {
555 let this = self.clone();
556 let user_id = user.id;
557 let login = user.github_login;
558 let span = info_span!("handle connection", %user_id, %login, %address);
559 let mut teardown = self.teardown.subscribe();
560 async move {
561 let (connection_id, handle_io, mut incoming_rx) = this
562 .peer
563 .add_connection(connection, {
564 let executor = executor.clone();
565 move |duration| executor.sleep(duration)
566 });
567
568 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
569 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
570 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
571
572 if let Some(send_connection_id) = send_connection_id.take() {
573 let _ = send_connection_id.send(connection_id);
574 }
575
576 if !user.connected_once {
577 this.peer.send(connection_id, proto::ShowContacts {})?;
578 this.app_state.db.set_user_connected_once(user_id, true).await?;
579 }
580
581 let (contacts, channels_for_user, channel_invites) = future::try_join3(
582 this.app_state.db.get_contacts(user_id),
583 this.app_state.db.get_channels_for_user(user_id),
584 this.app_state.db.get_channel_invites_for_user(user_id)
585 ).await?;
586
587 {
588 let mut pool = this.connection_pool.lock();
589 pool.add_connection(connection_id, user_id, user.admin);
590 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
591 this.peer.send(connection_id, build_initial_channels_update(
592 channels_for_user,
593 channel_invites
594 ))?;
595 }
596
597 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
598 this.peer.send(connection_id, incoming_call)?;
599 }
600
601 let session = Session {
602 user_id,
603 connection_id,
604 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
605 peer: this.peer.clone(),
606 connection_pool: this.connection_pool.clone(),
607 live_kit_client: this.app_state.live_kit_client.clone(),
608 executor: executor.clone(),
609 };
610 update_user_contacts(user_id, &session).await?;
611
612 let handle_io = handle_io.fuse();
613 futures::pin_mut!(handle_io);
614
615 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
616 // This prevents deadlocks when e.g., client A performs a request to client B and
617 // client B performs a request to client A. If both clients stop processing further
618 // messages until their respective request completes, they won't have a chance to
619 // respond to the other client's request and cause a deadlock.
620 //
621 // This arrangement ensures we will attempt to process earlier messages first, but fall
622 // back to processing messages arrived later in the spirit of making progress.
623 let mut foreground_message_handlers = FuturesUnordered::new();
624 let concurrent_handlers = Arc::new(Semaphore::new(256));
625 loop {
626 let next_message = async {
627 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
628 let message = incoming_rx.next().await;
629 (permit, message)
630 }.fuse();
631 futures::pin_mut!(next_message);
632 futures::select_biased! {
633 _ = teardown.changed().fuse() => return Ok(()),
634 result = handle_io => {
635 if let Err(error) = result {
636 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
637 }
638 break;
639 }
640 _ = foreground_message_handlers.next() => {}
641 next_message = next_message => {
642 let (permit, message) = next_message;
643 if let Some(message) = message {
644 let type_name = message.payload_type_name();
645 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
646 let span_enter = span.enter();
647 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
648 let is_background = message.is_background();
649 let handle_message = (handler)(message, session.clone());
650 drop(span_enter);
651
652 let handle_message = async move {
653 handle_message.await;
654 drop(permit);
655 }.instrument(span);
656 if is_background {
657 executor.spawn_detached(handle_message);
658 } else {
659 foreground_message_handlers.push(handle_message);
660 }
661 } else {
662 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
663 }
664 } else {
665 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
666 break;
667 }
668 }
669 }
670 }
671
672 drop(foreground_message_handlers);
673 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
674 if let Err(error) = connection_lost(session, teardown, executor).await {
675 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
676 }
677
678 Ok(())
679 }.instrument(span)
680 }
681
682 pub async fn invite_code_redeemed(
683 self: &Arc<Self>,
684 inviter_id: UserId,
685 invitee_id: UserId,
686 ) -> Result<()> {
687 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
688 if let Some(code) = &user.invite_code {
689 let pool = self.connection_pool.lock();
690 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
691 for connection_id in pool.user_connection_ids(inviter_id) {
692 self.peer.send(
693 connection_id,
694 proto::UpdateContacts {
695 contacts: vec![invitee_contact.clone()],
696 ..Default::default()
697 },
698 )?;
699 self.peer.send(
700 connection_id,
701 proto::UpdateInviteInfo {
702 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
703 count: user.invite_count as u32,
704 },
705 )?;
706 }
707 }
708 }
709 Ok(())
710 }
711
712 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
713 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
714 if let Some(invite_code) = &user.invite_code {
715 let pool = self.connection_pool.lock();
716 for connection_id in pool.user_connection_ids(user_id) {
717 self.peer.send(
718 connection_id,
719 proto::UpdateInviteInfo {
720 url: format!(
721 "{}{}",
722 self.app_state.config.invite_link_prefix, invite_code
723 ),
724 count: user.invite_count as u32,
725 },
726 )?;
727 }
728 }
729 }
730 Ok(())
731 }
732
733 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
734 ServerSnapshot {
735 connection_pool: ConnectionPoolGuard {
736 guard: self.connection_pool.lock(),
737 _not_send: PhantomData,
738 },
739 peer: &self.peer,
740 }
741 }
742}
743
744impl<'a> Deref for ConnectionPoolGuard<'a> {
745 type Target = ConnectionPool;
746
747 fn deref(&self) -> &Self::Target {
748 &*self.guard
749 }
750}
751
752impl<'a> DerefMut for ConnectionPoolGuard<'a> {
753 fn deref_mut(&mut self) -> &mut Self::Target {
754 &mut *self.guard
755 }
756}
757
758impl<'a> Drop for ConnectionPoolGuard<'a> {
759 fn drop(&mut self) {
760 #[cfg(test)]
761 self.check_invariants();
762 }
763}
764
765fn broadcast<F>(
766 sender_id: Option<ConnectionId>,
767 receiver_ids: impl IntoIterator<Item = ConnectionId>,
768 mut f: F,
769) where
770 F: FnMut(ConnectionId) -> anyhow::Result<()>,
771{
772 for receiver_id in receiver_ids {
773 if Some(receiver_id) != sender_id {
774 if let Err(error) = f(receiver_id) {
775 tracing::error!("failed to send to {:?} {}", receiver_id, error);
776 }
777 }
778 }
779}
780
781lazy_static! {
782 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
783}
784
785pub struct ProtocolVersion(u32);
786
787impl Header for ProtocolVersion {
788 fn name() -> &'static HeaderName {
789 &ZED_PROTOCOL_VERSION
790 }
791
792 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
793 where
794 Self: Sized,
795 I: Iterator<Item = &'i axum::http::HeaderValue>,
796 {
797 let version = values
798 .next()
799 .ok_or_else(axum::headers::Error::invalid)?
800 .to_str()
801 .map_err(|_| axum::headers::Error::invalid())?
802 .parse()
803 .map_err(|_| axum::headers::Error::invalid())?;
804 Ok(Self(version))
805 }
806
807 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
808 values.extend([self.0.to_string().parse().unwrap()]);
809 }
810}
811
812pub fn routes(server: Arc<Server>) -> Router<Body> {
813 Router::new()
814 .route("/rpc", get(handle_websocket_request))
815 .layer(
816 ServiceBuilder::new()
817 .layer(Extension(server.app_state.clone()))
818 .layer(middleware::from_fn(auth::validate_header)),
819 )
820 .route("/metrics", get(handle_metrics))
821 .layer(Extension(server))
822}
823
824pub async fn handle_websocket_request(
825 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
826 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
827 Extension(server): Extension<Arc<Server>>,
828 Extension(user): Extension<User>,
829 ws: WebSocketUpgrade,
830) -> axum::response::Response {
831 if protocol_version != rpc::PROTOCOL_VERSION {
832 return (
833 StatusCode::UPGRADE_REQUIRED,
834 "client must be upgraded".to_string(),
835 )
836 .into_response();
837 }
838 let socket_address = socket_address.to_string();
839 ws.on_upgrade(move |socket| {
840 use util::ResultExt;
841 let socket = socket
842 .map_ok(to_tungstenite_message)
843 .err_into()
844 .with(|message| async move { Ok(to_axum_message(message)) });
845 let connection = Connection::new(Box::pin(socket));
846 async move {
847 server
848 .handle_connection(connection, socket_address, user, None, Executor::Production)
849 .await
850 .log_err();
851 }
852 })
853}
854
855pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
856 let connections = server
857 .connection_pool
858 .lock()
859 .connections()
860 .filter(|connection| !connection.admin)
861 .count();
862
863 METRIC_CONNECTIONS.set(connections as _);
864
865 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
866 METRIC_SHARED_PROJECTS.set(shared_projects as _);
867
868 let encoder = prometheus::TextEncoder::new();
869 let metric_families = prometheus::gather();
870 let encoded_metrics = encoder
871 .encode_to_string(&metric_families)
872 .map_err(|err| anyhow!("{}", err))?;
873 Ok(encoded_metrics)
874}
875
876#[instrument(err, skip(executor))]
877async fn connection_lost(
878 session: Session,
879 mut teardown: watch::Receiver<()>,
880 executor: Executor,
881) -> Result<()> {
882 session.peer.disconnect(session.connection_id);
883 session
884 .connection_pool()
885 .await
886 .remove_connection(session.connection_id)?;
887
888 session
889 .db()
890 .await
891 .connection_lost(session.connection_id)
892 .await
893 .trace_err();
894
895 futures::select_biased! {
896 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
897 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
898 leave_room_for_session(&session).await.trace_err();
899 leave_channel_buffers_for_session(&session)
900 .await
901 .trace_err();
902
903 if !session
904 .connection_pool()
905 .await
906 .is_user_online(session.user_id)
907 {
908 let db = session.db().await;
909 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
910 room_updated(&room, &session.peer);
911 }
912 }
913
914 update_user_contacts(session.user_id, &session).await?;
915 }
916 _ = teardown.changed().fuse() => {}
917 }
918
919 Ok(())
920}
921
922async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
923 response.send(proto::Ack {})?;
924 Ok(())
925}
926
927async fn create_room(
928 _request: proto::CreateRoom,
929 response: Response<proto::CreateRoom>,
930 session: Session,
931) -> Result<()> {
932 let live_kit_room = nanoid::nanoid!(30);
933
934 let live_kit_connection_info = {
935 let live_kit_room = live_kit_room.clone();
936 let live_kit = session.live_kit_client.as_ref();
937
938 util::async_iife!({
939 let live_kit = live_kit?;
940
941 let token = live_kit
942 .room_token(&live_kit_room, &session.user_id.to_string())
943 .trace_err()?;
944
945 Some(proto::LiveKitConnectionInfo {
946 server_url: live_kit.url().into(),
947 token,
948 })
949 })
950 }
951 .await;
952
953 let room = session
954 .db()
955 .await
956 .create_room(
957 session.user_id,
958 session.connection_id,
959 &live_kit_room,
960 RELEASE_CHANNEL_NAME.as_str(),
961 )
962 .await?;
963
964 response.send(proto::CreateRoomResponse {
965 room: Some(room.clone()),
966 live_kit_connection_info,
967 })?;
968
969 update_user_contacts(session.user_id, &session).await?;
970 Ok(())
971}
972
973async fn join_room(
974 request: proto::JoinRoom,
975 response: Response<proto::JoinRoom>,
976 session: Session,
977) -> Result<()> {
978 let room_id = RoomId::from_proto(request.id);
979 let joined_room = {
980 let room = session
981 .db()
982 .await
983 .join_room(
984 room_id,
985 session.user_id,
986 session.connection_id,
987 RELEASE_CHANNEL_NAME.as_str(),
988 )
989 .await?;
990 room_updated(&room.room, &session.peer);
991 room.into_inner()
992 };
993
994 if let Some(channel_id) = joined_room.channel_id {
995 channel_updated(
996 channel_id,
997 &joined_room.room,
998 &joined_room.channel_members,
999 &session.peer,
1000 &*session.connection_pool().await,
1001 )
1002 }
1003
1004 for connection_id in session
1005 .connection_pool()
1006 .await
1007 .user_connection_ids(session.user_id)
1008 {
1009 session
1010 .peer
1011 .send(
1012 connection_id,
1013 proto::CallCanceled {
1014 room_id: room_id.to_proto(),
1015 },
1016 )
1017 .trace_err();
1018 }
1019
1020 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1021 if let Some(token) = live_kit
1022 .room_token(
1023 &joined_room.room.live_kit_room,
1024 &session.user_id.to_string(),
1025 )
1026 .trace_err()
1027 {
1028 Some(proto::LiveKitConnectionInfo {
1029 server_url: live_kit.url().into(),
1030 token,
1031 })
1032 } else {
1033 None
1034 }
1035 } else {
1036 None
1037 };
1038
1039 response.send(proto::JoinRoomResponse {
1040 room: Some(joined_room.room),
1041 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1042 live_kit_connection_info,
1043 })?;
1044
1045 update_user_contacts(session.user_id, &session).await?;
1046 Ok(())
1047}
1048
1049async fn rejoin_room(
1050 request: proto::RejoinRoom,
1051 response: Response<proto::RejoinRoom>,
1052 session: Session,
1053) -> Result<()> {
1054 let room;
1055 let channel_id;
1056 let channel_members;
1057 {
1058 let mut rejoined_room = session
1059 .db()
1060 .await
1061 .rejoin_room(request, session.user_id, session.connection_id)
1062 .await?;
1063
1064 response.send(proto::RejoinRoomResponse {
1065 room: Some(rejoined_room.room.clone()),
1066 reshared_projects: rejoined_room
1067 .reshared_projects
1068 .iter()
1069 .map(|project| proto::ResharedProject {
1070 id: project.id.to_proto(),
1071 collaborators: project
1072 .collaborators
1073 .iter()
1074 .map(|collaborator| collaborator.to_proto())
1075 .collect(),
1076 })
1077 .collect(),
1078 rejoined_projects: rejoined_room
1079 .rejoined_projects
1080 .iter()
1081 .map(|rejoined_project| proto::RejoinedProject {
1082 id: rejoined_project.id.to_proto(),
1083 worktrees: rejoined_project
1084 .worktrees
1085 .iter()
1086 .map(|worktree| proto::WorktreeMetadata {
1087 id: worktree.id,
1088 root_name: worktree.root_name.clone(),
1089 visible: worktree.visible,
1090 abs_path: worktree.abs_path.clone(),
1091 })
1092 .collect(),
1093 collaborators: rejoined_project
1094 .collaborators
1095 .iter()
1096 .map(|collaborator| collaborator.to_proto())
1097 .collect(),
1098 language_servers: rejoined_project.language_servers.clone(),
1099 })
1100 .collect(),
1101 })?;
1102 room_updated(&rejoined_room.room, &session.peer);
1103
1104 for project in &rejoined_room.reshared_projects {
1105 for collaborator in &project.collaborators {
1106 session
1107 .peer
1108 .send(
1109 collaborator.connection_id,
1110 proto::UpdateProjectCollaborator {
1111 project_id: project.id.to_proto(),
1112 old_peer_id: Some(project.old_connection_id.into()),
1113 new_peer_id: Some(session.connection_id.into()),
1114 },
1115 )
1116 .trace_err();
1117 }
1118
1119 broadcast(
1120 Some(session.connection_id),
1121 project
1122 .collaborators
1123 .iter()
1124 .map(|collaborator| collaborator.connection_id),
1125 |connection_id| {
1126 session.peer.forward_send(
1127 session.connection_id,
1128 connection_id,
1129 proto::UpdateProject {
1130 project_id: project.id.to_proto(),
1131 worktrees: project.worktrees.clone(),
1132 },
1133 )
1134 },
1135 );
1136 }
1137
1138 for project in &rejoined_room.rejoined_projects {
1139 for collaborator in &project.collaborators {
1140 session
1141 .peer
1142 .send(
1143 collaborator.connection_id,
1144 proto::UpdateProjectCollaborator {
1145 project_id: project.id.to_proto(),
1146 old_peer_id: Some(project.old_connection_id.into()),
1147 new_peer_id: Some(session.connection_id.into()),
1148 },
1149 )
1150 .trace_err();
1151 }
1152 }
1153
1154 for project in &mut rejoined_room.rejoined_projects {
1155 for worktree in mem::take(&mut project.worktrees) {
1156 #[cfg(any(test, feature = "test-support"))]
1157 const MAX_CHUNK_SIZE: usize = 2;
1158 #[cfg(not(any(test, feature = "test-support")))]
1159 const MAX_CHUNK_SIZE: usize = 256;
1160
1161 // Stream this worktree's entries.
1162 let message = proto::UpdateWorktree {
1163 project_id: project.id.to_proto(),
1164 worktree_id: worktree.id,
1165 abs_path: worktree.abs_path.clone(),
1166 root_name: worktree.root_name,
1167 updated_entries: worktree.updated_entries,
1168 removed_entries: worktree.removed_entries,
1169 scan_id: worktree.scan_id,
1170 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1171 updated_repositories: worktree.updated_repositories,
1172 removed_repositories: worktree.removed_repositories,
1173 };
1174 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1175 session.peer.send(session.connection_id, update.clone())?;
1176 }
1177
1178 // Stream this worktree's diagnostics.
1179 for summary in worktree.diagnostic_summaries {
1180 session.peer.send(
1181 session.connection_id,
1182 proto::UpdateDiagnosticSummary {
1183 project_id: project.id.to_proto(),
1184 worktree_id: worktree.id,
1185 summary: Some(summary),
1186 },
1187 )?;
1188 }
1189
1190 for settings_file in worktree.settings_files {
1191 session.peer.send(
1192 session.connection_id,
1193 proto::UpdateWorktreeSettings {
1194 project_id: project.id.to_proto(),
1195 worktree_id: worktree.id,
1196 path: settings_file.path,
1197 content: Some(settings_file.content),
1198 },
1199 )?;
1200 }
1201 }
1202
1203 for language_server in &project.language_servers {
1204 session.peer.send(
1205 session.connection_id,
1206 proto::UpdateLanguageServer {
1207 project_id: project.id.to_proto(),
1208 language_server_id: language_server.id,
1209 variant: Some(
1210 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1211 proto::LspDiskBasedDiagnosticsUpdated {},
1212 ),
1213 ),
1214 },
1215 )?;
1216 }
1217 }
1218
1219 let rejoined_room = rejoined_room.into_inner();
1220
1221 room = rejoined_room.room;
1222 channel_id = rejoined_room.channel_id;
1223 channel_members = rejoined_room.channel_members;
1224 }
1225
1226 if let Some(channel_id) = channel_id {
1227 channel_updated(
1228 channel_id,
1229 &room,
1230 &channel_members,
1231 &session.peer,
1232 &*session.connection_pool().await,
1233 );
1234 }
1235
1236 update_user_contacts(session.user_id, &session).await?;
1237 Ok(())
1238}
1239
1240async fn leave_room(
1241 _: proto::LeaveRoom,
1242 response: Response<proto::LeaveRoom>,
1243 session: Session,
1244) -> Result<()> {
1245 leave_room_for_session(&session).await?;
1246 response.send(proto::Ack {})?;
1247 Ok(())
1248}
1249
1250async fn call(
1251 request: proto::Call,
1252 response: Response<proto::Call>,
1253 session: Session,
1254) -> Result<()> {
1255 let room_id = RoomId::from_proto(request.room_id);
1256 let calling_user_id = session.user_id;
1257 let calling_connection_id = session.connection_id;
1258 let called_user_id = UserId::from_proto(request.called_user_id);
1259 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1260 if !session
1261 .db()
1262 .await
1263 .has_contact(calling_user_id, called_user_id)
1264 .await?
1265 {
1266 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1267 }
1268
1269 let incoming_call = {
1270 let (room, incoming_call) = &mut *session
1271 .db()
1272 .await
1273 .call(
1274 room_id,
1275 calling_user_id,
1276 calling_connection_id,
1277 called_user_id,
1278 initial_project_id,
1279 )
1280 .await?;
1281 room_updated(&room, &session.peer);
1282 mem::take(incoming_call)
1283 };
1284 update_user_contacts(called_user_id, &session).await?;
1285
1286 let mut calls = session
1287 .connection_pool()
1288 .await
1289 .user_connection_ids(called_user_id)
1290 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1291 .collect::<FuturesUnordered<_>>();
1292
1293 while let Some(call_response) = calls.next().await {
1294 match call_response.as_ref() {
1295 Ok(_) => {
1296 response.send(proto::Ack {})?;
1297 return Ok(());
1298 }
1299 Err(_) => {
1300 call_response.trace_err();
1301 }
1302 }
1303 }
1304
1305 {
1306 let room = session
1307 .db()
1308 .await
1309 .call_failed(room_id, called_user_id)
1310 .await?;
1311 room_updated(&room, &session.peer);
1312 }
1313 update_user_contacts(called_user_id, &session).await?;
1314
1315 Err(anyhow!("failed to ring user"))?
1316}
1317
1318async fn cancel_call(
1319 request: proto::CancelCall,
1320 response: Response<proto::CancelCall>,
1321 session: Session,
1322) -> Result<()> {
1323 let called_user_id = UserId::from_proto(request.called_user_id);
1324 let room_id = RoomId::from_proto(request.room_id);
1325 {
1326 let room = session
1327 .db()
1328 .await
1329 .cancel_call(room_id, session.connection_id, called_user_id)
1330 .await?;
1331 room_updated(&room, &session.peer);
1332 }
1333
1334 for connection_id in session
1335 .connection_pool()
1336 .await
1337 .user_connection_ids(called_user_id)
1338 {
1339 session
1340 .peer
1341 .send(
1342 connection_id,
1343 proto::CallCanceled {
1344 room_id: room_id.to_proto(),
1345 },
1346 )
1347 .trace_err();
1348 }
1349 response.send(proto::Ack {})?;
1350
1351 update_user_contacts(called_user_id, &session).await?;
1352 Ok(())
1353}
1354
1355async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1356 let room_id = RoomId::from_proto(message.room_id);
1357 {
1358 let room = session
1359 .db()
1360 .await
1361 .decline_call(Some(room_id), session.user_id)
1362 .await?
1363 .ok_or_else(|| anyhow!("failed to decline call"))?;
1364 room_updated(&room, &session.peer);
1365 }
1366
1367 for connection_id in session
1368 .connection_pool()
1369 .await
1370 .user_connection_ids(session.user_id)
1371 {
1372 session
1373 .peer
1374 .send(
1375 connection_id,
1376 proto::CallCanceled {
1377 room_id: room_id.to_proto(),
1378 },
1379 )
1380 .trace_err();
1381 }
1382 update_user_contacts(session.user_id, &session).await?;
1383 Ok(())
1384}
1385
1386async fn update_participant_location(
1387 request: proto::UpdateParticipantLocation,
1388 response: Response<proto::UpdateParticipantLocation>,
1389 session: Session,
1390) -> Result<()> {
1391 let room_id = RoomId::from_proto(request.room_id);
1392 let location = request
1393 .location
1394 .ok_or_else(|| anyhow!("invalid location"))?;
1395
1396 let db = session.db().await;
1397 let room = db
1398 .update_room_participant_location(room_id, session.connection_id, location)
1399 .await?;
1400
1401 room_updated(&room, &session.peer);
1402 response.send(proto::Ack {})?;
1403 Ok(())
1404}
1405
1406async fn share_project(
1407 request: proto::ShareProject,
1408 response: Response<proto::ShareProject>,
1409 session: Session,
1410) -> Result<()> {
1411 let (project_id, room) = &*session
1412 .db()
1413 .await
1414 .share_project(
1415 RoomId::from_proto(request.room_id),
1416 session.connection_id,
1417 &request.worktrees,
1418 )
1419 .await?;
1420 response.send(proto::ShareProjectResponse {
1421 project_id: project_id.to_proto(),
1422 })?;
1423 room_updated(&room, &session.peer);
1424
1425 Ok(())
1426}
1427
1428async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1429 let project_id = ProjectId::from_proto(message.project_id);
1430
1431 let (room, guest_connection_ids) = &*session
1432 .db()
1433 .await
1434 .unshare_project(project_id, session.connection_id)
1435 .await?;
1436
1437 broadcast(
1438 Some(session.connection_id),
1439 guest_connection_ids.iter().copied(),
1440 |conn_id| session.peer.send(conn_id, message.clone()),
1441 );
1442 room_updated(&room, &session.peer);
1443
1444 Ok(())
1445}
1446
1447async fn join_project(
1448 request: proto::JoinProject,
1449 response: Response<proto::JoinProject>,
1450 session: Session,
1451) -> Result<()> {
1452 let project_id = ProjectId::from_proto(request.project_id);
1453 let guest_user_id = session.user_id;
1454
1455 tracing::info!(%project_id, "join project");
1456
1457 let (project, replica_id) = &mut *session
1458 .db()
1459 .await
1460 .join_project(project_id, session.connection_id)
1461 .await?;
1462
1463 let collaborators = project
1464 .collaborators
1465 .iter()
1466 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1467 .map(|collaborator| collaborator.to_proto())
1468 .collect::<Vec<_>>();
1469
1470 let worktrees = project
1471 .worktrees
1472 .iter()
1473 .map(|(id, worktree)| proto::WorktreeMetadata {
1474 id: *id,
1475 root_name: worktree.root_name.clone(),
1476 visible: worktree.visible,
1477 abs_path: worktree.abs_path.clone(),
1478 })
1479 .collect::<Vec<_>>();
1480
1481 for collaborator in &collaborators {
1482 session
1483 .peer
1484 .send(
1485 collaborator.peer_id.unwrap().into(),
1486 proto::AddProjectCollaborator {
1487 project_id: project_id.to_proto(),
1488 collaborator: Some(proto::Collaborator {
1489 peer_id: Some(session.connection_id.into()),
1490 replica_id: replica_id.0 as u32,
1491 user_id: guest_user_id.to_proto(),
1492 }),
1493 },
1494 )
1495 .trace_err();
1496 }
1497
1498 // First, we send the metadata associated with each worktree.
1499 response.send(proto::JoinProjectResponse {
1500 worktrees: worktrees.clone(),
1501 replica_id: replica_id.0 as u32,
1502 collaborators: collaborators.clone(),
1503 language_servers: project.language_servers.clone(),
1504 })?;
1505
1506 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1507 #[cfg(any(test, feature = "test-support"))]
1508 const MAX_CHUNK_SIZE: usize = 2;
1509 #[cfg(not(any(test, feature = "test-support")))]
1510 const MAX_CHUNK_SIZE: usize = 256;
1511
1512 // Stream this worktree's entries.
1513 let message = proto::UpdateWorktree {
1514 project_id: project_id.to_proto(),
1515 worktree_id,
1516 abs_path: worktree.abs_path.clone(),
1517 root_name: worktree.root_name,
1518 updated_entries: worktree.entries,
1519 removed_entries: Default::default(),
1520 scan_id: worktree.scan_id,
1521 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1522 updated_repositories: worktree.repository_entries.into_values().collect(),
1523 removed_repositories: Default::default(),
1524 };
1525 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1526 session.peer.send(session.connection_id, update.clone())?;
1527 }
1528
1529 // Stream this worktree's diagnostics.
1530 for summary in worktree.diagnostic_summaries {
1531 session.peer.send(
1532 session.connection_id,
1533 proto::UpdateDiagnosticSummary {
1534 project_id: project_id.to_proto(),
1535 worktree_id: worktree.id,
1536 summary: Some(summary),
1537 },
1538 )?;
1539 }
1540
1541 for settings_file in worktree.settings_files {
1542 session.peer.send(
1543 session.connection_id,
1544 proto::UpdateWorktreeSettings {
1545 project_id: project_id.to_proto(),
1546 worktree_id: worktree.id,
1547 path: settings_file.path,
1548 content: Some(settings_file.content),
1549 },
1550 )?;
1551 }
1552 }
1553
1554 for language_server in &project.language_servers {
1555 session.peer.send(
1556 session.connection_id,
1557 proto::UpdateLanguageServer {
1558 project_id: project_id.to_proto(),
1559 language_server_id: language_server.id,
1560 variant: Some(
1561 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1562 proto::LspDiskBasedDiagnosticsUpdated {},
1563 ),
1564 ),
1565 },
1566 )?;
1567 }
1568
1569 Ok(())
1570}
1571
1572async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1573 let sender_id = session.connection_id;
1574 let project_id = ProjectId::from_proto(request.project_id);
1575
1576 let (room, project) = &*session
1577 .db()
1578 .await
1579 .leave_project(project_id, sender_id)
1580 .await?;
1581 tracing::info!(
1582 %project_id,
1583 host_user_id = %project.host_user_id,
1584 host_connection_id = %project.host_connection_id,
1585 "leave project"
1586 );
1587
1588 project_left(&project, &session);
1589 room_updated(&room, &session.peer);
1590
1591 Ok(())
1592}
1593
1594async fn update_project(
1595 request: proto::UpdateProject,
1596 response: Response<proto::UpdateProject>,
1597 session: Session,
1598) -> Result<()> {
1599 let project_id = ProjectId::from_proto(request.project_id);
1600 let (room, guest_connection_ids) = &*session
1601 .db()
1602 .await
1603 .update_project(project_id, session.connection_id, &request.worktrees)
1604 .await?;
1605 broadcast(
1606 Some(session.connection_id),
1607 guest_connection_ids.iter().copied(),
1608 |connection_id| {
1609 session
1610 .peer
1611 .forward_send(session.connection_id, connection_id, request.clone())
1612 },
1613 );
1614 room_updated(&room, &session.peer);
1615 response.send(proto::Ack {})?;
1616
1617 Ok(())
1618}
1619
1620async fn update_worktree(
1621 request: proto::UpdateWorktree,
1622 response: Response<proto::UpdateWorktree>,
1623 session: Session,
1624) -> Result<()> {
1625 let guest_connection_ids = session
1626 .db()
1627 .await
1628 .update_worktree(&request, session.connection_id)
1629 .await?;
1630
1631 broadcast(
1632 Some(session.connection_id),
1633 guest_connection_ids.iter().copied(),
1634 |connection_id| {
1635 session
1636 .peer
1637 .forward_send(session.connection_id, connection_id, request.clone())
1638 },
1639 );
1640 response.send(proto::Ack {})?;
1641 Ok(())
1642}
1643
1644async fn update_diagnostic_summary(
1645 message: proto::UpdateDiagnosticSummary,
1646 session: Session,
1647) -> Result<()> {
1648 let guest_connection_ids = session
1649 .db()
1650 .await
1651 .update_diagnostic_summary(&message, session.connection_id)
1652 .await?;
1653
1654 broadcast(
1655 Some(session.connection_id),
1656 guest_connection_ids.iter().copied(),
1657 |connection_id| {
1658 session
1659 .peer
1660 .forward_send(session.connection_id, connection_id, message.clone())
1661 },
1662 );
1663
1664 Ok(())
1665}
1666
1667async fn update_worktree_settings(
1668 message: proto::UpdateWorktreeSettings,
1669 session: Session,
1670) -> Result<()> {
1671 let guest_connection_ids = session
1672 .db()
1673 .await
1674 .update_worktree_settings(&message, session.connection_id)
1675 .await?;
1676
1677 broadcast(
1678 Some(session.connection_id),
1679 guest_connection_ids.iter().copied(),
1680 |connection_id| {
1681 session
1682 .peer
1683 .forward_send(session.connection_id, connection_id, message.clone())
1684 },
1685 );
1686
1687 Ok(())
1688}
1689
1690async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1691 broadcast_project_message(request.project_id, request, session).await
1692}
1693
1694async fn start_language_server(
1695 request: proto::StartLanguageServer,
1696 session: Session,
1697) -> Result<()> {
1698 let guest_connection_ids = session
1699 .db()
1700 .await
1701 .start_language_server(&request, session.connection_id)
1702 .await?;
1703
1704 broadcast(
1705 Some(session.connection_id),
1706 guest_connection_ids.iter().copied(),
1707 |connection_id| {
1708 session
1709 .peer
1710 .forward_send(session.connection_id, connection_id, request.clone())
1711 },
1712 );
1713 Ok(())
1714}
1715
1716async fn update_language_server(
1717 request: proto::UpdateLanguageServer,
1718 session: Session,
1719) -> Result<()> {
1720 session.executor.record_backtrace();
1721 let project_id = ProjectId::from_proto(request.project_id);
1722 let project_connection_ids = session
1723 .db()
1724 .await
1725 .project_connection_ids(project_id, session.connection_id)
1726 .await?;
1727 broadcast(
1728 Some(session.connection_id),
1729 project_connection_ids.iter().copied(),
1730 |connection_id| {
1731 session
1732 .peer
1733 .forward_send(session.connection_id, connection_id, request.clone())
1734 },
1735 );
1736 Ok(())
1737}
1738
1739async fn forward_project_request<T>(
1740 request: T,
1741 response: Response<T>,
1742 session: Session,
1743) -> Result<()>
1744where
1745 T: EntityMessage + RequestMessage,
1746{
1747 session.executor.record_backtrace();
1748 let project_id = ProjectId::from_proto(request.remote_entity_id());
1749 let host_connection_id = {
1750 let collaborators = session
1751 .db()
1752 .await
1753 .project_collaborators(project_id, session.connection_id)
1754 .await?;
1755 collaborators
1756 .iter()
1757 .find(|collaborator| collaborator.is_host)
1758 .ok_or_else(|| anyhow!("host not found"))?
1759 .connection_id
1760 };
1761
1762 let payload = session
1763 .peer
1764 .forward_request(session.connection_id, host_connection_id, request)
1765 .await?;
1766
1767 response.send(payload)?;
1768 Ok(())
1769}
1770
1771async fn create_buffer_for_peer(
1772 request: proto::CreateBufferForPeer,
1773 session: Session,
1774) -> Result<()> {
1775 session.executor.record_backtrace();
1776 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1777 session
1778 .peer
1779 .forward_send(session.connection_id, peer_id.into(), request)?;
1780 Ok(())
1781}
1782
1783async fn update_buffer(
1784 request: proto::UpdateBuffer,
1785 response: Response<proto::UpdateBuffer>,
1786 session: Session,
1787) -> Result<()> {
1788 session.executor.record_backtrace();
1789 let project_id = ProjectId::from_proto(request.project_id);
1790 let mut guest_connection_ids;
1791 let mut host_connection_id = None;
1792 {
1793 let collaborators = session
1794 .db()
1795 .await
1796 .project_collaborators(project_id, session.connection_id)
1797 .await?;
1798 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1799 for collaborator in collaborators.iter() {
1800 if collaborator.is_host {
1801 host_connection_id = Some(collaborator.connection_id);
1802 } else {
1803 guest_connection_ids.push(collaborator.connection_id);
1804 }
1805 }
1806 }
1807 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1808
1809 session.executor.record_backtrace();
1810 broadcast(
1811 Some(session.connection_id),
1812 guest_connection_ids,
1813 |connection_id| {
1814 session
1815 .peer
1816 .forward_send(session.connection_id, connection_id, request.clone())
1817 },
1818 );
1819 if host_connection_id != session.connection_id {
1820 session
1821 .peer
1822 .forward_request(session.connection_id, host_connection_id, request.clone())
1823 .await?;
1824 }
1825
1826 response.send(proto::Ack {})?;
1827 Ok(())
1828}
1829
1830async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1831 let project_id = ProjectId::from_proto(request.project_id);
1832 let project_connection_ids = session
1833 .db()
1834 .await
1835 .project_connection_ids(project_id, session.connection_id)
1836 .await?;
1837
1838 broadcast(
1839 Some(session.connection_id),
1840 project_connection_ids.iter().copied(),
1841 |connection_id| {
1842 session
1843 .peer
1844 .forward_send(session.connection_id, connection_id, request.clone())
1845 },
1846 );
1847 Ok(())
1848}
1849
1850async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1851 let project_id = ProjectId::from_proto(request.project_id);
1852 let project_connection_ids = session
1853 .db()
1854 .await
1855 .project_connection_ids(project_id, session.connection_id)
1856 .await?;
1857 broadcast(
1858 Some(session.connection_id),
1859 project_connection_ids.iter().copied(),
1860 |connection_id| {
1861 session
1862 .peer
1863 .forward_send(session.connection_id, connection_id, request.clone())
1864 },
1865 );
1866 Ok(())
1867}
1868
1869async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1870 broadcast_project_message(request.project_id, request, session).await
1871}
1872
1873async fn broadcast_project_message<T: EnvelopedMessage>(
1874 project_id: u64,
1875 request: T,
1876 session: Session,
1877) -> Result<()> {
1878 let project_id = ProjectId::from_proto(project_id);
1879 let project_connection_ids = session
1880 .db()
1881 .await
1882 .project_connection_ids(project_id, session.connection_id)
1883 .await?;
1884 broadcast(
1885 Some(session.connection_id),
1886 project_connection_ids.iter().copied(),
1887 |connection_id| {
1888 session
1889 .peer
1890 .forward_send(session.connection_id, connection_id, request.clone())
1891 },
1892 );
1893 Ok(())
1894}
1895
1896async fn follow(
1897 request: proto::Follow,
1898 response: Response<proto::Follow>,
1899 session: Session,
1900) -> Result<()> {
1901 let room_id = RoomId::from_proto(request.room_id);
1902 let project_id = request.project_id.map(ProjectId::from_proto);
1903 let leader_id = request
1904 .leader_id
1905 .ok_or_else(|| anyhow!("invalid leader id"))?
1906 .into();
1907 let follower_id = session.connection_id;
1908
1909 session
1910 .db()
1911 .await
1912 .check_room_participants(room_id, leader_id, session.connection_id)
1913 .await?;
1914
1915 let response_payload = session
1916 .peer
1917 .forward_request(session.connection_id, leader_id, request)
1918 .await?;
1919 response.send(response_payload)?;
1920
1921 if let Some(project_id) = project_id {
1922 let room = session
1923 .db()
1924 .await
1925 .follow(room_id, project_id, leader_id, follower_id)
1926 .await?;
1927 room_updated(&room, &session.peer);
1928 }
1929
1930 Ok(())
1931}
1932
1933async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1934 let room_id = RoomId::from_proto(request.room_id);
1935 let project_id = request.project_id.map(ProjectId::from_proto);
1936 let leader_id = request
1937 .leader_id
1938 .ok_or_else(|| anyhow!("invalid leader id"))?
1939 .into();
1940 let follower_id = session.connection_id;
1941
1942 session
1943 .db()
1944 .await
1945 .check_room_participants(room_id, leader_id, session.connection_id)
1946 .await?;
1947
1948 session
1949 .peer
1950 .forward_send(session.connection_id, leader_id, request)?;
1951
1952 if let Some(project_id) = project_id {
1953 let room = session
1954 .db()
1955 .await
1956 .unfollow(room_id, project_id, leader_id, follower_id)
1957 .await?;
1958 room_updated(&room, &session.peer);
1959 }
1960
1961 Ok(())
1962}
1963
1964async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1965 let room_id = RoomId::from_proto(request.room_id);
1966 let database = session.db.lock().await;
1967
1968 let connection_ids = if let Some(project_id) = request.project_id {
1969 let project_id = ProjectId::from_proto(project_id);
1970 database
1971 .project_connection_ids(project_id, session.connection_id)
1972 .await?
1973 } else {
1974 database
1975 .room_connection_ids(room_id, session.connection_id)
1976 .await?
1977 };
1978
1979 // For now, don't send view update messages back to that view's current leader.
1980 let connection_id_to_omit = request.variant.as_ref().and_then(|variant| match variant {
1981 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1982 _ => None,
1983 });
1984
1985 for follower_peer_id in request.follower_ids.iter().copied() {
1986 let follower_connection_id = follower_peer_id.into();
1987 if Some(follower_peer_id) != connection_id_to_omit
1988 && connection_ids.contains(&follower_connection_id)
1989 {
1990 session.peer.forward_send(
1991 session.connection_id,
1992 follower_connection_id,
1993 request.clone(),
1994 )?;
1995 }
1996 }
1997 Ok(())
1998}
1999
2000async fn get_users(
2001 request: proto::GetUsers,
2002 response: Response<proto::GetUsers>,
2003 session: Session,
2004) -> Result<()> {
2005 let user_ids = request
2006 .user_ids
2007 .into_iter()
2008 .map(UserId::from_proto)
2009 .collect();
2010 let users = session
2011 .db()
2012 .await
2013 .get_users_by_ids(user_ids)
2014 .await?
2015 .into_iter()
2016 .map(|user| proto::User {
2017 id: user.id.to_proto(),
2018 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2019 github_login: user.github_login,
2020 })
2021 .collect();
2022 response.send(proto::UsersResponse { users })?;
2023 Ok(())
2024}
2025
2026async fn fuzzy_search_users(
2027 request: proto::FuzzySearchUsers,
2028 response: Response<proto::FuzzySearchUsers>,
2029 session: Session,
2030) -> Result<()> {
2031 let query = request.query;
2032 let users = match query.len() {
2033 0 => vec![],
2034 1 | 2 => session
2035 .db()
2036 .await
2037 .get_user_by_github_login(&query)
2038 .await?
2039 .into_iter()
2040 .collect(),
2041 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2042 };
2043 let users = users
2044 .into_iter()
2045 .filter(|user| user.id != session.user_id)
2046 .map(|user| proto::User {
2047 id: user.id.to_proto(),
2048 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2049 github_login: user.github_login,
2050 })
2051 .collect();
2052 response.send(proto::UsersResponse { users })?;
2053 Ok(())
2054}
2055
2056async fn request_contact(
2057 request: proto::RequestContact,
2058 response: Response<proto::RequestContact>,
2059 session: Session,
2060) -> Result<()> {
2061 let requester_id = session.user_id;
2062 let responder_id = UserId::from_proto(request.responder_id);
2063 if requester_id == responder_id {
2064 return Err(anyhow!("cannot add yourself as a contact"))?;
2065 }
2066
2067 session
2068 .db()
2069 .await
2070 .send_contact_request(requester_id, responder_id)
2071 .await?;
2072
2073 // Update outgoing contact requests of requester
2074 let mut update = proto::UpdateContacts::default();
2075 update.outgoing_requests.push(responder_id.to_proto());
2076 for connection_id in session
2077 .connection_pool()
2078 .await
2079 .user_connection_ids(requester_id)
2080 {
2081 session.peer.send(connection_id, update.clone())?;
2082 }
2083
2084 // Update incoming contact requests of responder
2085 let mut update = proto::UpdateContacts::default();
2086 update
2087 .incoming_requests
2088 .push(proto::IncomingContactRequest {
2089 requester_id: requester_id.to_proto(),
2090 should_notify: true,
2091 });
2092 for connection_id in session
2093 .connection_pool()
2094 .await
2095 .user_connection_ids(responder_id)
2096 {
2097 session.peer.send(connection_id, update.clone())?;
2098 }
2099
2100 response.send(proto::Ack {})?;
2101 Ok(())
2102}
2103
2104async fn respond_to_contact_request(
2105 request: proto::RespondToContactRequest,
2106 response: Response<proto::RespondToContactRequest>,
2107 session: Session,
2108) -> Result<()> {
2109 let responder_id = session.user_id;
2110 let requester_id = UserId::from_proto(request.requester_id);
2111 let db = session.db().await;
2112 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2113 db.dismiss_contact_notification(responder_id, requester_id)
2114 .await?;
2115 } else {
2116 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2117
2118 db.respond_to_contact_request(responder_id, requester_id, accept)
2119 .await?;
2120 let requester_busy = db.is_user_busy(requester_id).await?;
2121 let responder_busy = db.is_user_busy(responder_id).await?;
2122
2123 let pool = session.connection_pool().await;
2124 // Update responder with new contact
2125 let mut update = proto::UpdateContacts::default();
2126 if accept {
2127 update
2128 .contacts
2129 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2130 }
2131 update
2132 .remove_incoming_requests
2133 .push(requester_id.to_proto());
2134 for connection_id in pool.user_connection_ids(responder_id) {
2135 session.peer.send(connection_id, update.clone())?;
2136 }
2137
2138 // Update requester with new contact
2139 let mut update = proto::UpdateContacts::default();
2140 if accept {
2141 update
2142 .contacts
2143 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2144 }
2145 update
2146 .remove_outgoing_requests
2147 .push(responder_id.to_proto());
2148 for connection_id in pool.user_connection_ids(requester_id) {
2149 session.peer.send(connection_id, update.clone())?;
2150 }
2151 }
2152
2153 response.send(proto::Ack {})?;
2154 Ok(())
2155}
2156
2157async fn remove_contact(
2158 request: proto::RemoveContact,
2159 response: Response<proto::RemoveContact>,
2160 session: Session,
2161) -> Result<()> {
2162 let requester_id = session.user_id;
2163 let responder_id = UserId::from_proto(request.user_id);
2164 let db = session.db().await;
2165 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2166
2167 let pool = session.connection_pool().await;
2168 // Update outgoing contact requests of requester
2169 let mut update = proto::UpdateContacts::default();
2170 if contact_accepted {
2171 update.remove_contacts.push(responder_id.to_proto());
2172 } else {
2173 update
2174 .remove_outgoing_requests
2175 .push(responder_id.to_proto());
2176 }
2177 for connection_id in pool.user_connection_ids(requester_id) {
2178 session.peer.send(connection_id, update.clone())?;
2179 }
2180
2181 // Update incoming contact requests of responder
2182 let mut update = proto::UpdateContacts::default();
2183 if contact_accepted {
2184 update.remove_contacts.push(requester_id.to_proto());
2185 } else {
2186 update
2187 .remove_incoming_requests
2188 .push(requester_id.to_proto());
2189 }
2190 for connection_id in pool.user_connection_ids(responder_id) {
2191 session.peer.send(connection_id, update.clone())?;
2192 }
2193
2194 response.send(proto::Ack {})?;
2195 Ok(())
2196}
2197
2198async fn create_channel(
2199 request: proto::CreateChannel,
2200 response: Response<proto::CreateChannel>,
2201 session: Session,
2202) -> Result<()> {
2203 let db = session.db().await;
2204
2205 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2206 let id = db
2207 .create_channel(&request.name, parent_id, session.user_id)
2208 .await?;
2209
2210 let channel = proto::Channel {
2211 id: id.to_proto(),
2212 name: request.name,
2213 // TODO: Visibility
2214 visibility: proto::ChannelVisibility::ChannelMembers as i32,
2215 };
2216
2217 response.send(proto::CreateChannelResponse {
2218 channel: Some(channel.clone()),
2219 parent_id: request.parent_id,
2220 })?;
2221
2222 let Some(parent_id) = parent_id else {
2223 return Ok(());
2224 };
2225
2226 let update = proto::UpdateChannels {
2227 channels: vec![channel],
2228 insert_edge: vec![ChannelEdge {
2229 parent_id: parent_id.to_proto(),
2230 channel_id: id.to_proto(),
2231 }],
2232 ..Default::default()
2233 };
2234
2235 let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2236
2237 let connection_pool = session.connection_pool().await;
2238 for user_id in user_ids_to_notify {
2239 for connection_id in connection_pool.user_connection_ids(user_id) {
2240 if user_id == session.user_id {
2241 continue;
2242 }
2243 session.peer.send(connection_id, update.clone())?;
2244 }
2245 }
2246
2247 Ok(())
2248}
2249
2250async fn delete_channel(
2251 request: proto::DeleteChannel,
2252 response: Response<proto::DeleteChannel>,
2253 session: Session,
2254) -> Result<()> {
2255 let db = session.db().await;
2256
2257 let channel_id = request.channel_id;
2258 let (removed_channels, member_ids) = db
2259 .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2260 .await?;
2261 response.send(proto::Ack {})?;
2262
2263 // Notify members of removed channels
2264 let mut update = proto::UpdateChannels::default();
2265 update
2266 .delete_channels
2267 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2268
2269 let connection_pool = session.connection_pool().await;
2270 for member_id in member_ids {
2271 for connection_id in connection_pool.user_connection_ids(member_id) {
2272 session.peer.send(connection_id, update.clone())?;
2273 }
2274 }
2275
2276 Ok(())
2277}
2278
2279async fn invite_channel_member(
2280 request: proto::InviteChannelMember,
2281 response: Response<proto::InviteChannelMember>,
2282 session: Session,
2283) -> Result<()> {
2284 let db = session.db().await;
2285 let channel_id = ChannelId::from_proto(request.channel_id);
2286 let invitee_id = UserId::from_proto(request.user_id);
2287 db.invite_channel_member(
2288 channel_id,
2289 invitee_id,
2290 session.user_id,
2291 request.role().into(),
2292 )
2293 .await?;
2294
2295 let (channel, _) = db
2296 .get_channel(channel_id, session.user_id)
2297 .await?
2298 .ok_or_else(|| anyhow!("channel not found"))?;
2299
2300 let mut update = proto::UpdateChannels::default();
2301 update.channel_invitations.push(proto::Channel {
2302 id: channel.id.to_proto(),
2303 name: channel.name,
2304 // TODO: Visibility
2305 visibility: proto::ChannelVisibility::ChannelMembers as i32,
2306 });
2307 for connection_id in session
2308 .connection_pool()
2309 .await
2310 .user_connection_ids(invitee_id)
2311 {
2312 session.peer.send(connection_id, update.clone())?;
2313 }
2314
2315 response.send(proto::Ack {})?;
2316 Ok(())
2317}
2318
2319async fn remove_channel_member(
2320 request: proto::RemoveChannelMember,
2321 response: Response<proto::RemoveChannelMember>,
2322 session: Session,
2323) -> Result<()> {
2324 let db = session.db().await;
2325 let channel_id = ChannelId::from_proto(request.channel_id);
2326 let member_id = UserId::from_proto(request.user_id);
2327
2328 db.remove_channel_member(channel_id, member_id, session.user_id)
2329 .await?;
2330
2331 let mut update = proto::UpdateChannels::default();
2332 update.delete_channels.push(channel_id.to_proto());
2333
2334 for connection_id in session
2335 .connection_pool()
2336 .await
2337 .user_connection_ids(member_id)
2338 {
2339 session.peer.send(connection_id, update.clone())?;
2340 }
2341
2342 response.send(proto::Ack {})?;
2343 Ok(())
2344}
2345
2346async fn set_channel_member_role(
2347 request: proto::SetChannelMemberRole,
2348 response: Response<proto::SetChannelMemberRole>,
2349 session: Session,
2350) -> Result<()> {
2351 let db = session.db().await;
2352 let channel_id = ChannelId::from_proto(request.channel_id);
2353 let member_id = UserId::from_proto(request.user_id);
2354 db.set_channel_member_role(
2355 channel_id,
2356 session.user_id,
2357 member_id,
2358 request.role().into(),
2359 )
2360 .await?;
2361
2362 let (channel, has_accepted) = db
2363 .get_channel(channel_id, member_id)
2364 .await?
2365 .ok_or_else(|| anyhow!("channel not found"))?;
2366
2367 let mut update = proto::UpdateChannels::default();
2368 if has_accepted {
2369 update.channel_permissions.push(proto::ChannelPermission {
2370 channel_id: channel.id.to_proto(),
2371 role: request.role,
2372 });
2373 }
2374
2375 for connection_id in session
2376 .connection_pool()
2377 .await
2378 .user_connection_ids(member_id)
2379 {
2380 session.peer.send(connection_id, update.clone())?;
2381 }
2382
2383 response.send(proto::Ack {})?;
2384 Ok(())
2385}
2386
2387async fn rename_channel(
2388 request: proto::RenameChannel,
2389 response: Response<proto::RenameChannel>,
2390 session: Session,
2391) -> Result<()> {
2392 let db = session.db().await;
2393 let channel_id = ChannelId::from_proto(request.channel_id);
2394 let new_name = db
2395 .rename_channel(channel_id, session.user_id, &request.name)
2396 .await?;
2397
2398 let channel = proto::Channel {
2399 id: request.channel_id,
2400 name: new_name,
2401 // TODO: Visibility
2402 visibility: proto::ChannelVisibility::ChannelMembers as i32,
2403 };
2404 response.send(proto::RenameChannelResponse {
2405 channel: Some(channel.clone()),
2406 })?;
2407 let mut update = proto::UpdateChannels::default();
2408 update.channels.push(channel);
2409
2410 let member_ids = db.get_channel_members(channel_id).await?;
2411
2412 let connection_pool = session.connection_pool().await;
2413 for member_id in member_ids {
2414 for connection_id in connection_pool.user_connection_ids(member_id) {
2415 session.peer.send(connection_id, update.clone())?;
2416 }
2417 }
2418
2419 Ok(())
2420}
2421
2422async fn link_channel(
2423 request: proto::LinkChannel,
2424 response: Response<proto::LinkChannel>,
2425 session: Session,
2426) -> Result<()> {
2427 let db = session.db().await;
2428 let channel_id = ChannelId::from_proto(request.channel_id);
2429 let to = ChannelId::from_proto(request.to);
2430 let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2431
2432 let members = db.get_channel_members(to).await?;
2433 let connection_pool = session.connection_pool().await;
2434 let update = proto::UpdateChannels {
2435 channels: channels_to_send
2436 .channels
2437 .into_iter()
2438 .map(|channel| proto::Channel {
2439 id: channel.id.to_proto(),
2440 name: channel.name,
2441 // TODO: Visibility
2442 visibility: proto::ChannelVisibility::ChannelMembers as i32,
2443 })
2444 .collect(),
2445 insert_edge: channels_to_send.edges,
2446 ..Default::default()
2447 };
2448 for member_id in members {
2449 for connection_id in connection_pool.user_connection_ids(member_id) {
2450 session.peer.send(connection_id, update.clone())?;
2451 }
2452 }
2453
2454 response.send(Ack {})?;
2455
2456 Ok(())
2457}
2458
2459async fn unlink_channel(
2460 request: proto::UnlinkChannel,
2461 response: Response<proto::UnlinkChannel>,
2462 session: Session,
2463) -> Result<()> {
2464 let db = session.db().await;
2465 let channel_id = ChannelId::from_proto(request.channel_id);
2466 let from = ChannelId::from_proto(request.from);
2467
2468 db.unlink_channel(session.user_id, channel_id, from).await?;
2469
2470 let members = db.get_channel_members(from).await?;
2471
2472 let update = proto::UpdateChannels {
2473 delete_edge: vec![proto::ChannelEdge {
2474 channel_id: channel_id.to_proto(),
2475 parent_id: from.to_proto(),
2476 }],
2477 ..Default::default()
2478 };
2479 let connection_pool = session.connection_pool().await;
2480 for member_id in members {
2481 for connection_id in connection_pool.user_connection_ids(member_id) {
2482 session.peer.send(connection_id, update.clone())?;
2483 }
2484 }
2485
2486 response.send(Ack {})?;
2487
2488 Ok(())
2489}
2490
2491async fn move_channel(
2492 request: proto::MoveChannel,
2493 response: Response<proto::MoveChannel>,
2494 session: Session,
2495) -> Result<()> {
2496 let db = session.db().await;
2497 let channel_id = ChannelId::from_proto(request.channel_id);
2498 let from_parent = ChannelId::from_proto(request.from);
2499 let to = ChannelId::from_proto(request.to);
2500
2501 let channels_to_send = db
2502 .move_channel(session.user_id, channel_id, from_parent, to)
2503 .await?;
2504
2505 if channels_to_send.is_empty() {
2506 response.send(Ack {})?;
2507 return Ok(());
2508 }
2509
2510 let members_from = db.get_channel_members(from_parent).await?;
2511 let members_to = db.get_channel_members(to).await?;
2512
2513 let update = proto::UpdateChannels {
2514 delete_edge: vec![proto::ChannelEdge {
2515 channel_id: channel_id.to_proto(),
2516 parent_id: from_parent.to_proto(),
2517 }],
2518 ..Default::default()
2519 };
2520 let connection_pool = session.connection_pool().await;
2521 for member_id in members_from {
2522 for connection_id in connection_pool.user_connection_ids(member_id) {
2523 session.peer.send(connection_id, update.clone())?;
2524 }
2525 }
2526
2527 let update = proto::UpdateChannels {
2528 channels: channels_to_send
2529 .channels
2530 .into_iter()
2531 .map(|channel| proto::Channel {
2532 id: channel.id.to_proto(),
2533 name: channel.name,
2534 // TODO: Visibility
2535 visibility: proto::ChannelVisibility::ChannelMembers as i32,
2536 })
2537 .collect(),
2538 insert_edge: channels_to_send.edges,
2539 ..Default::default()
2540 };
2541 for member_id in members_to {
2542 for connection_id in connection_pool.user_connection_ids(member_id) {
2543 session.peer.send(connection_id, update.clone())?;
2544 }
2545 }
2546
2547 response.send(Ack {})?;
2548
2549 Ok(())
2550}
2551
2552async fn get_channel_members(
2553 request: proto::GetChannelMembers,
2554 response: Response<proto::GetChannelMembers>,
2555 session: Session,
2556) -> Result<()> {
2557 let db = session.db().await;
2558 let channel_id = ChannelId::from_proto(request.channel_id);
2559 let members = db
2560 .get_channel_member_details(channel_id, session.user_id)
2561 .await?;
2562 response.send(proto::GetChannelMembersResponse { members })?;
2563 Ok(())
2564}
2565
2566async fn respond_to_channel_invite(
2567 request: proto::RespondToChannelInvite,
2568 response: Response<proto::RespondToChannelInvite>,
2569 session: Session,
2570) -> Result<()> {
2571 let db = session.db().await;
2572 let channel_id = ChannelId::from_proto(request.channel_id);
2573 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2574 .await?;
2575
2576 let mut update = proto::UpdateChannels::default();
2577 update
2578 .remove_channel_invitations
2579 .push(channel_id.to_proto());
2580 if request.accept {
2581 let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2582 update
2583 .channels
2584 .extend(
2585 result
2586 .channels
2587 .channels
2588 .into_iter()
2589 .map(|channel| proto::Channel {
2590 id: channel.id.to_proto(),
2591 name: channel.name,
2592 // TODO: Visibility
2593 visibility: ChannelVisibility::ChannelMembers.into(),
2594 }),
2595 );
2596 update.unseen_channel_messages = result.channel_messages;
2597 update.unseen_channel_buffer_changes = result.unseen_buffer_changes;
2598 update.insert_edge = result.channels.edges;
2599 update
2600 .channel_participants
2601 .extend(
2602 result
2603 .channel_participants
2604 .into_iter()
2605 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2606 channel_id: channel_id.to_proto(),
2607 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2608 }),
2609 );
2610 update
2611 .channel_permissions
2612 .extend(
2613 result
2614 .channels_with_admin_privileges
2615 .into_iter()
2616 .map(|channel_id| proto::ChannelPermission {
2617 channel_id: channel_id.to_proto(),
2618 role: proto::ChannelRole::Admin.into(),
2619 }),
2620 );
2621 }
2622 session.peer.send(session.connection_id, update)?;
2623 response.send(proto::Ack {})?;
2624
2625 Ok(())
2626}
2627
2628async fn join_channel(
2629 request: proto::JoinChannel,
2630 response: Response<proto::JoinChannel>,
2631 session: Session,
2632) -> Result<()> {
2633 let channel_id = ChannelId::from_proto(request.channel_id);
2634 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2635
2636 let joined_room = {
2637 leave_room_for_session(&session).await?;
2638 let db = session.db().await;
2639
2640 let room_id = db
2641 .get_or_create_channel_room(channel_id, &live_kit_room, &*RELEASE_CHANNEL_NAME)
2642 .await?;
2643
2644 let joined_room = db
2645 .join_room(
2646 room_id,
2647 session.user_id,
2648 session.connection_id,
2649 RELEASE_CHANNEL_NAME.as_str(),
2650 )
2651 .await?;
2652
2653 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2654 let token = live_kit
2655 .room_token(
2656 &joined_room.room.live_kit_room,
2657 &session.user_id.to_string(),
2658 )
2659 .trace_err()?;
2660
2661 Some(LiveKitConnectionInfo {
2662 server_url: live_kit.url().into(),
2663 token,
2664 })
2665 });
2666
2667 response.send(proto::JoinRoomResponse {
2668 room: Some(joined_room.room.clone()),
2669 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2670 live_kit_connection_info,
2671 })?;
2672
2673 room_updated(&joined_room.room, &session.peer);
2674
2675 joined_room.into_inner()
2676 };
2677
2678 channel_updated(
2679 channel_id,
2680 &joined_room.room,
2681 &joined_room.channel_members,
2682 &session.peer,
2683 &*session.connection_pool().await,
2684 );
2685
2686 update_user_contacts(session.user_id, &session).await?;
2687
2688 Ok(())
2689}
2690
2691async fn join_channel_buffer(
2692 request: proto::JoinChannelBuffer,
2693 response: Response<proto::JoinChannelBuffer>,
2694 session: Session,
2695) -> Result<()> {
2696 let db = session.db().await;
2697 let channel_id = ChannelId::from_proto(request.channel_id);
2698
2699 let open_response = db
2700 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2701 .await?;
2702
2703 let collaborators = open_response.collaborators.clone();
2704 response.send(open_response)?;
2705
2706 let update = UpdateChannelBufferCollaborators {
2707 channel_id: channel_id.to_proto(),
2708 collaborators: collaborators.clone(),
2709 };
2710 channel_buffer_updated(
2711 session.connection_id,
2712 collaborators
2713 .iter()
2714 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2715 &update,
2716 &session.peer,
2717 );
2718
2719 Ok(())
2720}
2721
2722async fn update_channel_buffer(
2723 request: proto::UpdateChannelBuffer,
2724 session: Session,
2725) -> Result<()> {
2726 let db = session.db().await;
2727 let channel_id = ChannelId::from_proto(request.channel_id);
2728
2729 let (collaborators, non_collaborators, epoch, version) = db
2730 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2731 .await?;
2732
2733 channel_buffer_updated(
2734 session.connection_id,
2735 collaborators,
2736 &proto::UpdateChannelBuffer {
2737 channel_id: channel_id.to_proto(),
2738 operations: request.operations,
2739 },
2740 &session.peer,
2741 );
2742
2743 let pool = &*session.connection_pool().await;
2744
2745 broadcast(
2746 None,
2747 non_collaborators
2748 .iter()
2749 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2750 |peer_id| {
2751 session.peer.send(
2752 peer_id.into(),
2753 proto::UpdateChannels {
2754 unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange {
2755 channel_id: channel_id.to_proto(),
2756 epoch: epoch as u64,
2757 version: version.clone(),
2758 }],
2759 ..Default::default()
2760 },
2761 )
2762 },
2763 );
2764
2765 Ok(())
2766}
2767
2768async fn rejoin_channel_buffers(
2769 request: proto::RejoinChannelBuffers,
2770 response: Response<proto::RejoinChannelBuffers>,
2771 session: Session,
2772) -> Result<()> {
2773 let db = session.db().await;
2774 let buffers = db
2775 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2776 .await?;
2777
2778 for rejoined_buffer in &buffers {
2779 let collaborators_to_notify = rejoined_buffer
2780 .buffer
2781 .collaborators
2782 .iter()
2783 .filter_map(|c| Some(c.peer_id?.into()));
2784 channel_buffer_updated(
2785 session.connection_id,
2786 collaborators_to_notify,
2787 &proto::UpdateChannelBufferCollaborators {
2788 channel_id: rejoined_buffer.buffer.channel_id,
2789 collaborators: rejoined_buffer.buffer.collaborators.clone(),
2790 },
2791 &session.peer,
2792 );
2793 }
2794
2795 response.send(proto::RejoinChannelBuffersResponse {
2796 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2797 })?;
2798
2799 Ok(())
2800}
2801
2802async fn leave_channel_buffer(
2803 request: proto::LeaveChannelBuffer,
2804 response: Response<proto::LeaveChannelBuffer>,
2805 session: Session,
2806) -> Result<()> {
2807 let db = session.db().await;
2808 let channel_id = ChannelId::from_proto(request.channel_id);
2809
2810 let left_buffer = db
2811 .leave_channel_buffer(channel_id, session.connection_id)
2812 .await?;
2813
2814 response.send(Ack {})?;
2815
2816 channel_buffer_updated(
2817 session.connection_id,
2818 left_buffer.connections,
2819 &proto::UpdateChannelBufferCollaborators {
2820 channel_id: channel_id.to_proto(),
2821 collaborators: left_buffer.collaborators,
2822 },
2823 &session.peer,
2824 );
2825
2826 Ok(())
2827}
2828
2829fn channel_buffer_updated<T: EnvelopedMessage>(
2830 sender_id: ConnectionId,
2831 collaborators: impl IntoIterator<Item = ConnectionId>,
2832 message: &T,
2833 peer: &Peer,
2834) {
2835 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2836 peer.send(peer_id.into(), message.clone())
2837 });
2838}
2839
2840async fn send_channel_message(
2841 request: proto::SendChannelMessage,
2842 response: Response<proto::SendChannelMessage>,
2843 session: Session,
2844) -> Result<()> {
2845 // Validate the message body.
2846 let body = request.body.trim().to_string();
2847 if body.len() > MAX_MESSAGE_LEN {
2848 return Err(anyhow!("message is too long"))?;
2849 }
2850 if body.is_empty() {
2851 return Err(anyhow!("message can't be blank"))?;
2852 }
2853
2854 let timestamp = OffsetDateTime::now_utc();
2855 let nonce = request
2856 .nonce
2857 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2858
2859 let channel_id = ChannelId::from_proto(request.channel_id);
2860 let (message_id, connection_ids, non_participants) = session
2861 .db()
2862 .await
2863 .create_channel_message(
2864 channel_id,
2865 session.user_id,
2866 &body,
2867 timestamp,
2868 nonce.clone().into(),
2869 )
2870 .await?;
2871 let message = proto::ChannelMessage {
2872 sender_id: session.user_id.to_proto(),
2873 id: message_id.to_proto(),
2874 body,
2875 timestamp: timestamp.unix_timestamp() as u64,
2876 nonce: Some(nonce),
2877 };
2878 broadcast(Some(session.connection_id), connection_ids, |connection| {
2879 session.peer.send(
2880 connection,
2881 proto::ChannelMessageSent {
2882 channel_id: channel_id.to_proto(),
2883 message: Some(message.clone()),
2884 },
2885 )
2886 });
2887 response.send(proto::SendChannelMessageResponse {
2888 message: Some(message),
2889 })?;
2890
2891 let pool = &*session.connection_pool().await;
2892 broadcast(
2893 None,
2894 non_participants
2895 .iter()
2896 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2897 |peer_id| {
2898 session.peer.send(
2899 peer_id.into(),
2900 proto::UpdateChannels {
2901 unseen_channel_messages: vec![proto::UnseenChannelMessage {
2902 channel_id: channel_id.to_proto(),
2903 message_id: message_id.to_proto(),
2904 }],
2905 ..Default::default()
2906 },
2907 )
2908 },
2909 );
2910
2911 Ok(())
2912}
2913
2914async fn remove_channel_message(
2915 request: proto::RemoveChannelMessage,
2916 response: Response<proto::RemoveChannelMessage>,
2917 session: Session,
2918) -> Result<()> {
2919 let channel_id = ChannelId::from_proto(request.channel_id);
2920 let message_id = MessageId::from_proto(request.message_id);
2921 let connection_ids = session
2922 .db()
2923 .await
2924 .remove_channel_message(channel_id, message_id, session.user_id)
2925 .await?;
2926 broadcast(Some(session.connection_id), connection_ids, |connection| {
2927 session.peer.send(connection, request.clone())
2928 });
2929 response.send(proto::Ack {})?;
2930 Ok(())
2931}
2932
2933async fn acknowledge_channel_message(
2934 request: proto::AckChannelMessage,
2935 session: Session,
2936) -> Result<()> {
2937 let channel_id = ChannelId::from_proto(request.channel_id);
2938 let message_id = MessageId::from_proto(request.message_id);
2939 session
2940 .db()
2941 .await
2942 .observe_channel_message(channel_id, session.user_id, message_id)
2943 .await?;
2944 Ok(())
2945}
2946
2947async fn acknowledge_buffer_version(
2948 request: proto::AckBufferOperation,
2949 session: Session,
2950) -> Result<()> {
2951 let buffer_id = BufferId::from_proto(request.buffer_id);
2952 session
2953 .db()
2954 .await
2955 .observe_buffer_version(
2956 buffer_id,
2957 session.user_id,
2958 request.epoch as i32,
2959 &request.version,
2960 )
2961 .await?;
2962 Ok(())
2963}
2964
2965async fn join_channel_chat(
2966 request: proto::JoinChannelChat,
2967 response: Response<proto::JoinChannelChat>,
2968 session: Session,
2969) -> Result<()> {
2970 let channel_id = ChannelId::from_proto(request.channel_id);
2971
2972 let db = session.db().await;
2973 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2974 .await?;
2975 let messages = db
2976 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2977 .await?;
2978 response.send(proto::JoinChannelChatResponse {
2979 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2980 messages,
2981 })?;
2982 Ok(())
2983}
2984
2985async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2986 let channel_id = ChannelId::from_proto(request.channel_id);
2987 session
2988 .db()
2989 .await
2990 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2991 .await?;
2992 Ok(())
2993}
2994
2995async fn get_channel_messages(
2996 request: proto::GetChannelMessages,
2997 response: Response<proto::GetChannelMessages>,
2998 session: Session,
2999) -> Result<()> {
3000 let channel_id = ChannelId::from_proto(request.channel_id);
3001 let messages = session
3002 .db()
3003 .await
3004 .get_channel_messages(
3005 channel_id,
3006 session.user_id,
3007 MESSAGE_COUNT_PER_PAGE,
3008 Some(MessageId::from_proto(request.before_message_id)),
3009 )
3010 .await?;
3011 response.send(proto::GetChannelMessagesResponse {
3012 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
3013 messages,
3014 })?;
3015 Ok(())
3016}
3017
3018async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
3019 let project_id = ProjectId::from_proto(request.project_id);
3020 let project_connection_ids = session
3021 .db()
3022 .await
3023 .project_connection_ids(project_id, session.connection_id)
3024 .await?;
3025 broadcast(
3026 Some(session.connection_id),
3027 project_connection_ids.iter().copied(),
3028 |connection_id| {
3029 session
3030 .peer
3031 .forward_send(session.connection_id, connection_id, request.clone())
3032 },
3033 );
3034 Ok(())
3035}
3036
3037async fn get_private_user_info(
3038 _request: proto::GetPrivateUserInfo,
3039 response: Response<proto::GetPrivateUserInfo>,
3040 session: Session,
3041) -> Result<()> {
3042 let db = session.db().await;
3043
3044 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
3045 let user = db
3046 .get_user_by_id(session.user_id)
3047 .await?
3048 .ok_or_else(|| anyhow!("user not found"))?;
3049 let flags = db.get_user_flags(session.user_id).await?;
3050
3051 response.send(proto::GetPrivateUserInfoResponse {
3052 metrics_id,
3053 staff: user.admin,
3054 flags,
3055 })?;
3056 Ok(())
3057}
3058
3059fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
3060 match message {
3061 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
3062 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
3063 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
3064 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
3065 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
3066 code: frame.code.into(),
3067 reason: frame.reason,
3068 })),
3069 }
3070}
3071
3072fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
3073 match message {
3074 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
3075 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
3076 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
3077 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
3078 AxumMessage::Close(frame) => {
3079 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
3080 code: frame.code.into(),
3081 reason: frame.reason,
3082 }))
3083 }
3084 }
3085}
3086
3087fn build_initial_channels_update(
3088 channels: ChannelsForUser,
3089 channel_invites: Vec<db::Channel>,
3090) -> proto::UpdateChannels {
3091 let mut update = proto::UpdateChannels::default();
3092
3093 for channel in channels.channels.channels {
3094 update.channels.push(proto::Channel {
3095 id: channel.id.to_proto(),
3096 name: channel.name,
3097 // TODO: Visibility
3098 visibility: ChannelVisibility::Public.into(),
3099 });
3100 }
3101
3102 update.unseen_channel_buffer_changes = channels.unseen_buffer_changes;
3103 update.unseen_channel_messages = channels.channel_messages;
3104 update.insert_edge = channels.channels.edges;
3105
3106 for (channel_id, participants) in channels.channel_participants {
3107 update
3108 .channel_participants
3109 .push(proto::ChannelParticipants {
3110 channel_id: channel_id.to_proto(),
3111 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
3112 });
3113 }
3114
3115 update
3116 .channel_permissions
3117 .extend(
3118 channels
3119 .channels_with_admin_privileges
3120 .into_iter()
3121 .map(|id| proto::ChannelPermission {
3122 channel_id: id.to_proto(),
3123 role: proto::ChannelRole::Admin.into(),
3124 }),
3125 );
3126
3127 for channel in channel_invites {
3128 update.channel_invitations.push(proto::Channel {
3129 id: channel.id.to_proto(),
3130 name: channel.name,
3131 // TODO: Visibility
3132 visibility: ChannelVisibility::Public.into(),
3133 });
3134 }
3135
3136 update
3137}
3138
3139fn build_initial_contacts_update(
3140 contacts: Vec<db::Contact>,
3141 pool: &ConnectionPool,
3142) -> proto::UpdateContacts {
3143 let mut update = proto::UpdateContacts::default();
3144
3145 for contact in contacts {
3146 match contact {
3147 db::Contact::Accepted {
3148 user_id,
3149 should_notify,
3150 busy,
3151 } => {
3152 update
3153 .contacts
3154 .push(contact_for_user(user_id, should_notify, busy, &pool));
3155 }
3156 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3157 db::Contact::Incoming {
3158 user_id,
3159 should_notify,
3160 } => update
3161 .incoming_requests
3162 .push(proto::IncomingContactRequest {
3163 requester_id: user_id.to_proto(),
3164 should_notify,
3165 }),
3166 }
3167 }
3168
3169 update
3170}
3171
3172fn contact_for_user(
3173 user_id: UserId,
3174 should_notify: bool,
3175 busy: bool,
3176 pool: &ConnectionPool,
3177) -> proto::Contact {
3178 proto::Contact {
3179 user_id: user_id.to_proto(),
3180 online: pool.is_user_online(user_id),
3181 busy,
3182 should_notify,
3183 }
3184}
3185
3186fn room_updated(room: &proto::Room, peer: &Peer) {
3187 broadcast(
3188 None,
3189 room.participants
3190 .iter()
3191 .filter_map(|participant| Some(participant.peer_id?.into())),
3192 |peer_id| {
3193 peer.send(
3194 peer_id.into(),
3195 proto::RoomUpdated {
3196 room: Some(room.clone()),
3197 },
3198 )
3199 },
3200 );
3201}
3202
3203fn channel_updated(
3204 channel_id: ChannelId,
3205 room: &proto::Room,
3206 channel_members: &[UserId],
3207 peer: &Peer,
3208 pool: &ConnectionPool,
3209) {
3210 let participants = room
3211 .participants
3212 .iter()
3213 .map(|p| p.user_id)
3214 .collect::<Vec<_>>();
3215
3216 broadcast(
3217 None,
3218 channel_members
3219 .iter()
3220 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3221 |peer_id| {
3222 peer.send(
3223 peer_id.into(),
3224 proto::UpdateChannels {
3225 channel_participants: vec![proto::ChannelParticipants {
3226 channel_id: channel_id.to_proto(),
3227 participant_user_ids: participants.clone(),
3228 }],
3229 ..Default::default()
3230 },
3231 )
3232 },
3233 );
3234}
3235
3236async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3237 let db = session.db().await;
3238
3239 let contacts = db.get_contacts(user_id).await?;
3240 let busy = db.is_user_busy(user_id).await?;
3241
3242 let pool = session.connection_pool().await;
3243 let updated_contact = contact_for_user(user_id, false, busy, &pool);
3244 for contact in contacts {
3245 if let db::Contact::Accepted {
3246 user_id: contact_user_id,
3247 ..
3248 } = contact
3249 {
3250 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3251 session
3252 .peer
3253 .send(
3254 contact_conn_id,
3255 proto::UpdateContacts {
3256 contacts: vec![updated_contact.clone()],
3257 remove_contacts: Default::default(),
3258 incoming_requests: Default::default(),
3259 remove_incoming_requests: Default::default(),
3260 outgoing_requests: Default::default(),
3261 remove_outgoing_requests: Default::default(),
3262 },
3263 )
3264 .trace_err();
3265 }
3266 }
3267 }
3268 Ok(())
3269}
3270
3271async fn leave_room_for_session(session: &Session) -> Result<()> {
3272 let mut contacts_to_update = HashSet::default();
3273
3274 let room_id;
3275 let canceled_calls_to_user_ids;
3276 let live_kit_room;
3277 let delete_live_kit_room;
3278 let room;
3279 let channel_members;
3280 let channel_id;
3281
3282 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3283 contacts_to_update.insert(session.user_id);
3284
3285 for project in left_room.left_projects.values() {
3286 project_left(project, session);
3287 }
3288
3289 room_id = RoomId::from_proto(left_room.room.id);
3290 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3291 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3292 delete_live_kit_room = left_room.deleted;
3293 room = mem::take(&mut left_room.room);
3294 channel_members = mem::take(&mut left_room.channel_members);
3295 channel_id = left_room.channel_id;
3296
3297 room_updated(&room, &session.peer);
3298 } else {
3299 return Ok(());
3300 }
3301
3302 if let Some(channel_id) = channel_id {
3303 channel_updated(
3304 channel_id,
3305 &room,
3306 &channel_members,
3307 &session.peer,
3308 &*session.connection_pool().await,
3309 );
3310 }
3311
3312 {
3313 let pool = session.connection_pool().await;
3314 for canceled_user_id in canceled_calls_to_user_ids {
3315 for connection_id in pool.user_connection_ids(canceled_user_id) {
3316 session
3317 .peer
3318 .send(
3319 connection_id,
3320 proto::CallCanceled {
3321 room_id: room_id.to_proto(),
3322 },
3323 )
3324 .trace_err();
3325 }
3326 contacts_to_update.insert(canceled_user_id);
3327 }
3328 }
3329
3330 for contact_user_id in contacts_to_update {
3331 update_user_contacts(contact_user_id, &session).await?;
3332 }
3333
3334 if let Some(live_kit) = session.live_kit_client.as_ref() {
3335 live_kit
3336 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3337 .await
3338 .trace_err();
3339
3340 if delete_live_kit_room {
3341 live_kit.delete_room(live_kit_room).await.trace_err();
3342 }
3343 }
3344
3345 Ok(())
3346}
3347
3348async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3349 let left_channel_buffers = session
3350 .db()
3351 .await
3352 .leave_channel_buffers(session.connection_id)
3353 .await?;
3354
3355 for left_buffer in left_channel_buffers {
3356 channel_buffer_updated(
3357 session.connection_id,
3358 left_buffer.connections,
3359 &proto::UpdateChannelBufferCollaborators {
3360 channel_id: left_buffer.channel_id.to_proto(),
3361 collaborators: left_buffer.collaborators,
3362 },
3363 &session.peer,
3364 );
3365 }
3366
3367 Ok(())
3368}
3369
3370fn project_left(project: &db::LeftProject, session: &Session) {
3371 for connection_id in &project.connection_ids {
3372 if project.host_user_id == session.user_id {
3373 session
3374 .peer
3375 .send(
3376 *connection_id,
3377 proto::UnshareProject {
3378 project_id: project.id.to_proto(),
3379 },
3380 )
3381 .trace_err();
3382 } else {
3383 session
3384 .peer
3385 .send(
3386 *connection_id,
3387 proto::RemoveProjectCollaborator {
3388 project_id: project.id.to_proto(),
3389 peer_id: Some(session.connection_id.into()),
3390 },
3391 )
3392 .trace_err();
3393 }
3394 }
3395}
3396
3397pub trait ResultExt {
3398 type Ok;
3399
3400 fn trace_err(self) -> Option<Self::Ok>;
3401}
3402
3403impl<T, E> ResultExt for Result<T, E>
3404where
3405 E: std::fmt::Debug,
3406{
3407 type Ok = T;
3408
3409 fn trace_err(self) -> Option<T> {
3410 match self {
3411 Ok(value) => Some(value),
3412 Err(error) => {
3413 tracing::error!("{:?}", error);
3414 None
3415 }
3416 }
3417 }
3418}