1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{self, Database, ProjectId, RoomId, User, UserId},
6 executor::Executor,
7 AppState, Result,
8};
9use anyhow::anyhow;
10use async_tungstenite::tungstenite::{
11 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
12};
13use axum::{
14 body::Body,
15 extract::{
16 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
17 ConnectInfo, WebSocketUpgrade,
18 },
19 headers::{Header, HeaderName},
20 http::StatusCode,
21 middleware,
22 response::IntoResponse,
23 routing::get,
24 Extension, Router, TypedHeader,
25};
26use collections::{HashMap, HashSet};
27pub use connection_pool::ConnectionPool;
28use futures::{
29 channel::oneshot,
30 future::{self, BoxFuture},
31 stream::FuturesUnordered,
32 FutureExt, SinkExt, StreamExt, TryStreamExt,
33};
34use lazy_static::lazy_static;
35use prometheus::{register_int_gauge, IntGauge};
36use rpc::{
37 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
38 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
39};
40use serde::{Serialize, Serializer};
41use std::{
42 any::TypeId,
43 fmt,
44 future::Future,
45 marker::PhantomData,
46 mem,
47 net::SocketAddr,
48 ops::{Deref, DerefMut},
49 rc::Rc,
50 sync::{
51 atomic::{AtomicBool, Ordering::SeqCst},
52 Arc,
53 },
54 time::Duration,
55};
56use tokio::sync::{watch, Mutex, MutexGuard};
57use tower::ServiceBuilder;
58use tracing::{info_span, instrument, Instrument};
59
60pub const RECONNECT_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT;
61
62lazy_static! {
63 static ref METRIC_CONNECTIONS: IntGauge =
64 register_int_gauge!("connections", "number of connections").unwrap();
65 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
66 "shared_projects",
67 "number of open projects with one or more guests"
68 )
69 .unwrap();
70}
71
72type MessageHandler =
73 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
74
75struct Response<R> {
76 peer: Arc<Peer>,
77 receipt: Receipt<R>,
78 responded: Arc<AtomicBool>,
79}
80
81impl<R: RequestMessage> Response<R> {
82 fn send(self, payload: R::Response) -> Result<()> {
83 self.responded.store(true, SeqCst);
84 self.peer.respond(self.receipt, payload)?;
85 Ok(())
86 }
87}
88
89#[derive(Clone)]
90struct Session {
91 user_id: UserId,
92 connection_id: ConnectionId,
93 db: Arc<Mutex<DbHandle>>,
94 peer: Arc<Peer>,
95 connection_pool: Arc<Mutex<ConnectionPool>>,
96 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
97}
98
99impl Session {
100 async fn db(&self) -> MutexGuard<DbHandle> {
101 #[cfg(test)]
102 tokio::task::yield_now().await;
103 let guard = self.db.lock().await;
104 #[cfg(test)]
105 tokio::task::yield_now().await;
106 guard
107 }
108
109 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
110 #[cfg(test)]
111 tokio::task::yield_now().await;
112 let guard = self.connection_pool.lock().await;
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 ConnectionPoolGuard {
116 guard,
117 _not_send: PhantomData,
118 }
119 }
120}
121
122impl fmt::Debug for Session {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.debug_struct("Session")
125 .field("user_id", &self.user_id)
126 .field("connection_id", &self.connection_id)
127 .finish()
128 }
129}
130
131struct DbHandle(Arc<Database>);
132
133impl Deref for DbHandle {
134 type Target = Database;
135
136 fn deref(&self) -> &Self::Target {
137 self.0.as_ref()
138 }
139}
140
141pub struct Server {
142 peer: Arc<Peer>,
143 pub(crate) connection_pool: Arc<Mutex<ConnectionPool>>,
144 app_state: Arc<AppState>,
145 executor: Executor,
146 handlers: HashMap<TypeId, MessageHandler>,
147 teardown: watch::Sender<()>,
148}
149
150pub(crate) struct ConnectionPoolGuard<'a> {
151 guard: MutexGuard<'a, ConnectionPool>,
152 _not_send: PhantomData<Rc<()>>,
153}
154
155#[derive(Serialize)]
156pub struct ServerSnapshot<'a> {
157 peer: &'a Peer,
158 #[serde(serialize_with = "serialize_deref")]
159 connection_pool: ConnectionPoolGuard<'a>,
160}
161
162pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
163where
164 S: Serializer,
165 T: Deref<Target = U>,
166 U: Serialize,
167{
168 Serialize::serialize(value.deref(), serializer)
169}
170
171impl Server {
172 pub fn new(app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
173 let mut server = Self {
174 peer: Peer::new(),
175 app_state,
176 executor,
177 connection_pool: Default::default(),
178 handlers: Default::default(),
179 teardown: watch::channel(()).0,
180 };
181
182 server
183 .add_request_handler(ping)
184 .add_request_handler(create_room)
185 .add_request_handler(join_room)
186 .add_message_handler(leave_room)
187 .add_request_handler(call)
188 .add_request_handler(cancel_call)
189 .add_message_handler(decline_call)
190 .add_request_handler(update_participant_location)
191 .add_request_handler(share_project)
192 .add_message_handler(unshare_project)
193 .add_request_handler(join_project)
194 .add_message_handler(leave_project)
195 .add_request_handler(update_project)
196 .add_request_handler(update_worktree)
197 .add_message_handler(start_language_server)
198 .add_message_handler(update_language_server)
199 .add_message_handler(update_diagnostic_summary)
200 .add_request_handler(forward_project_request::<proto::GetHover>)
201 .add_request_handler(forward_project_request::<proto::GetDefinition>)
202 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
203 .add_request_handler(forward_project_request::<proto::GetReferences>)
204 .add_request_handler(forward_project_request::<proto::SearchProject>)
205 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
206 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
207 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
208 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
209 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
210 .add_request_handler(forward_project_request::<proto::GetCompletions>)
211 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
212 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
213 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
214 .add_request_handler(forward_project_request::<proto::PrepareRename>)
215 .add_request_handler(forward_project_request::<proto::PerformRename>)
216 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
217 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
218 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
219 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
220 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
221 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
222 .add_message_handler(create_buffer_for_peer)
223 .add_request_handler(update_buffer)
224 .add_message_handler(update_buffer_file)
225 .add_message_handler(buffer_reloaded)
226 .add_message_handler(buffer_saved)
227 .add_request_handler(save_buffer)
228 .add_request_handler(get_users)
229 .add_request_handler(fuzzy_search_users)
230 .add_request_handler(request_contact)
231 .add_request_handler(remove_contact)
232 .add_request_handler(respond_to_contact_request)
233 .add_request_handler(follow)
234 .add_message_handler(unfollow)
235 .add_message_handler(update_followers)
236 .add_message_handler(update_diff_base)
237 .add_request_handler(get_private_user_info);
238
239 Arc::new(server)
240 }
241
242 pub async fn start(&self) -> Result<()> {
243 self.app_state.db.delete_stale_projects().await?;
244 let db = self.app_state.db.clone();
245 let peer = self.peer.clone();
246 let timeout = self.executor.sleep(RECONNECT_TIMEOUT);
247 let pool = self.connection_pool.clone();
248 let live_kit_client = self.app_state.live_kit_client.clone();
249 self.executor.spawn_detached(async move {
250 timeout.await;
251 if let Some(room_ids) = db.outdated_room_ids().await.trace_err() {
252 for room_id in room_ids {
253 let mut contacts_to_update = HashSet::default();
254 let mut canceled_calls_to_user_ids = Vec::new();
255 let mut live_kit_room = String::new();
256 let mut delete_live_kit_room = false;
257
258 if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
259 room_updated(&refreshed_room.room, &peer);
260 contacts_to_update
261 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
262 contacts_to_update
263 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
264 canceled_calls_to_user_ids =
265 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
266 dbg!(&canceled_calls_to_user_ids);
267 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
268 delete_live_kit_room = refreshed_room.room.participants.is_empty();
269 }
270
271 {
272 let pool = pool.lock().await;
273 for canceled_user_id in canceled_calls_to_user_ids {
274 for connection_id in pool.user_connection_ids(canceled_user_id) {
275 peer.send(connection_id, proto::CallCanceled {}).trace_err();
276 }
277 }
278 }
279
280 for user_id in contacts_to_update {
281 if let Some((busy, contacts)) = db
282 .is_user_busy(user_id)
283 .await
284 .trace_err()
285 .zip(db.get_contacts(user_id).await.trace_err())
286 {
287 let pool = pool.lock().await;
288 let updated_contact = contact_for_user(user_id, false, busy, &pool);
289 for contact in contacts {
290 if let db::Contact::Accepted {
291 user_id: contact_user_id,
292 ..
293 } = contact
294 {
295 for contact_conn_id in pool.user_connection_ids(contact_user_id)
296 {
297 peer.send(
298 contact_conn_id,
299 proto::UpdateContacts {
300 contacts: vec![updated_contact.clone()],
301 remove_contacts: Default::default(),
302 incoming_requests: Default::default(),
303 remove_incoming_requests: Default::default(),
304 outgoing_requests: Default::default(),
305 remove_outgoing_requests: Default::default(),
306 },
307 )
308 .trace_err();
309 }
310 }
311 }
312 }
313 }
314
315 if let Some(live_kit) = live_kit_client.as_ref() {
316 if delete_live_kit_room {
317 live_kit.delete_room(live_kit_room).await.trace_err();
318 }
319 }
320 }
321 }
322 });
323 Ok(())
324 }
325
326 pub fn teardown(&self) {
327 self.peer.reset();
328 let _ = self.teardown.send(());
329 }
330
331 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
332 where
333 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
334 Fut: 'static + Send + Future<Output = Result<()>>,
335 M: EnvelopedMessage,
336 {
337 let prev_handler = self.handlers.insert(
338 TypeId::of::<M>(),
339 Box::new(move |envelope, session| {
340 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
341 let span = info_span!(
342 "handle message",
343 payload_type = envelope.payload_type_name()
344 );
345 span.in_scope(|| {
346 tracing::info!(
347 payload_type = envelope.payload_type_name(),
348 "message received"
349 );
350 });
351 let future = (handler)(*envelope, session);
352 async move {
353 if let Err(error) = future.await {
354 tracing::error!(%error, "error handling message");
355 }
356 }
357 .instrument(span)
358 .boxed()
359 }),
360 );
361 if prev_handler.is_some() {
362 panic!("registered a handler for the same message twice");
363 }
364 self
365 }
366
367 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
368 where
369 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
370 Fut: 'static + Send + Future<Output = Result<()>>,
371 M: EnvelopedMessage,
372 {
373 self.add_handler(move |envelope, session| handler(envelope.payload, session));
374 self
375 }
376
377 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
378 where
379 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
380 Fut: Send + Future<Output = Result<()>>,
381 M: RequestMessage,
382 {
383 let handler = Arc::new(handler);
384 self.add_handler(move |envelope, session| {
385 let receipt = envelope.receipt();
386 let handler = handler.clone();
387 async move {
388 let peer = session.peer.clone();
389 let responded = Arc::new(AtomicBool::default());
390 let response = Response {
391 peer: peer.clone(),
392 responded: responded.clone(),
393 receipt,
394 };
395 match (handler)(envelope.payload, response, session).await {
396 Ok(()) => {
397 if responded.load(std::sync::atomic::Ordering::SeqCst) {
398 Ok(())
399 } else {
400 Err(anyhow!("handler did not send a response"))?
401 }
402 }
403 Err(error) => {
404 peer.respond_with_error(
405 receipt,
406 proto::Error {
407 message: error.to_string(),
408 },
409 )?;
410 Err(error)
411 }
412 }
413 }
414 })
415 }
416
417 pub fn handle_connection(
418 self: &Arc<Self>,
419 connection: Connection,
420 address: String,
421 user: User,
422 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
423 executor: Executor,
424 ) -> impl Future<Output = Result<()>> {
425 let this = self.clone();
426 let user_id = user.id;
427 let login = user.github_login;
428 let span = info_span!("handle connection", %user_id, %login, %address);
429 let mut teardown = self.teardown.subscribe();
430 async move {
431 let (connection_id, handle_io, mut incoming_rx) = this
432 .peer
433 .add_connection(connection, {
434 let executor = executor.clone();
435 move |duration| executor.sleep(duration)
436 });
437
438 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
439 this.peer.send(connection_id, proto::Hello { peer_id: connection_id.0 })?;
440 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
441
442 if let Some(send_connection_id) = send_connection_id.take() {
443 let _ = send_connection_id.send(connection_id);
444 }
445
446 if !user.connected_once {
447 this.peer.send(connection_id, proto::ShowContacts {})?;
448 this.app_state.db.set_user_connected_once(user_id, true).await?;
449 }
450
451 let (contacts, invite_code) = future::try_join(
452 this.app_state.db.get_contacts(user_id),
453 this.app_state.db.get_invite_code_for_user(user_id)
454 ).await?;
455
456 {
457 let mut pool = this.connection_pool.lock().await;
458 pool.add_connection(connection_id, user_id, user.admin);
459 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
460
461 if let Some((code, count)) = invite_code {
462 this.peer.send(connection_id, proto::UpdateInviteInfo {
463 url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
464 count: count as u32,
465 })?;
466 }
467 }
468
469 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
470 this.peer.send(connection_id, incoming_call)?;
471 }
472
473 let session = Session {
474 user_id,
475 connection_id,
476 db: Arc::new(Mutex::new(DbHandle(this.app_state.db.clone()))),
477 peer: this.peer.clone(),
478 connection_pool: this.connection_pool.clone(),
479 live_kit_client: this.app_state.live_kit_client.clone()
480 };
481 update_user_contacts(user_id, &session).await?;
482
483 let handle_io = handle_io.fuse();
484 futures::pin_mut!(handle_io);
485
486 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
487 // This prevents deadlocks when e.g., client A performs a request to client B and
488 // client B performs a request to client A. If both clients stop processing further
489 // messages until their respective request completes, they won't have a chance to
490 // respond to the other client's request and cause a deadlock.
491 //
492 // This arrangement ensures we will attempt to process earlier messages first, but fall
493 // back to processing messages arrived later in the spirit of making progress.
494 let mut foreground_message_handlers = FuturesUnordered::new();
495 loop {
496 let next_message = incoming_rx.next().fuse();
497 futures::pin_mut!(next_message);
498 futures::select_biased! {
499 _ = teardown.changed().fuse() => return Ok(()),
500 result = handle_io => {
501 if let Err(error) = result {
502 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
503 }
504 break;
505 }
506 _ = foreground_message_handlers.next() => {}
507 message = next_message => {
508 if let Some(message) = message {
509 let type_name = message.payload_type_name();
510 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
511 let span_enter = span.enter();
512 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
513 let is_background = message.is_background();
514 let handle_message = (handler)(message, session.clone());
515 drop(span_enter);
516
517 let handle_message = handle_message.instrument(span);
518 if is_background {
519 executor.spawn_detached(handle_message);
520 } else {
521 foreground_message_handlers.push(handle_message);
522 }
523 } else {
524 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
525 }
526 } else {
527 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
528 break;
529 }
530 }
531 }
532 }
533
534 drop(foreground_message_handlers);
535 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
536 if let Err(error) = sign_out(session, teardown, executor).await {
537 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
538 }
539
540 Ok(())
541 }.instrument(span)
542 }
543
544 pub async fn invite_code_redeemed(
545 self: &Arc<Self>,
546 inviter_id: UserId,
547 invitee_id: UserId,
548 ) -> Result<()> {
549 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
550 if let Some(code) = &user.invite_code {
551 let pool = self.connection_pool.lock().await;
552 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
553 for connection_id in pool.user_connection_ids(inviter_id) {
554 self.peer.send(
555 connection_id,
556 proto::UpdateContacts {
557 contacts: vec![invitee_contact.clone()],
558 ..Default::default()
559 },
560 )?;
561 self.peer.send(
562 connection_id,
563 proto::UpdateInviteInfo {
564 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
565 count: user.invite_count as u32,
566 },
567 )?;
568 }
569 }
570 }
571 Ok(())
572 }
573
574 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
575 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
576 if let Some(invite_code) = &user.invite_code {
577 let pool = self.connection_pool.lock().await;
578 for connection_id in pool.user_connection_ids(user_id) {
579 self.peer.send(
580 connection_id,
581 proto::UpdateInviteInfo {
582 url: format!(
583 "{}{}",
584 self.app_state.config.invite_link_prefix, invite_code
585 ),
586 count: user.invite_count as u32,
587 },
588 )?;
589 }
590 }
591 }
592 Ok(())
593 }
594
595 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
596 ServerSnapshot {
597 connection_pool: ConnectionPoolGuard {
598 guard: self.connection_pool.lock().await,
599 _not_send: PhantomData,
600 },
601 peer: &self.peer,
602 }
603 }
604}
605
606impl<'a> Deref for ConnectionPoolGuard<'a> {
607 type Target = ConnectionPool;
608
609 fn deref(&self) -> &Self::Target {
610 &*self.guard
611 }
612}
613
614impl<'a> DerefMut for ConnectionPoolGuard<'a> {
615 fn deref_mut(&mut self) -> &mut Self::Target {
616 &mut *self.guard
617 }
618}
619
620impl<'a> Drop for ConnectionPoolGuard<'a> {
621 fn drop(&mut self) {
622 #[cfg(test)]
623 self.check_invariants();
624 }
625}
626
627fn broadcast<F>(
628 sender_id: ConnectionId,
629 receiver_ids: impl IntoIterator<Item = ConnectionId>,
630 mut f: F,
631) where
632 F: FnMut(ConnectionId) -> anyhow::Result<()>,
633{
634 for receiver_id in receiver_ids {
635 if receiver_id != sender_id {
636 f(receiver_id).trace_err();
637 }
638 }
639}
640
641lazy_static! {
642 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
643}
644
645pub struct ProtocolVersion(u32);
646
647impl Header for ProtocolVersion {
648 fn name() -> &'static HeaderName {
649 &ZED_PROTOCOL_VERSION
650 }
651
652 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
653 where
654 Self: Sized,
655 I: Iterator<Item = &'i axum::http::HeaderValue>,
656 {
657 let version = values
658 .next()
659 .ok_or_else(axum::headers::Error::invalid)?
660 .to_str()
661 .map_err(|_| axum::headers::Error::invalid())?
662 .parse()
663 .map_err(|_| axum::headers::Error::invalid())?;
664 Ok(Self(version))
665 }
666
667 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
668 values.extend([self.0.to_string().parse().unwrap()]);
669 }
670}
671
672pub fn routes(server: Arc<Server>) -> Router<Body> {
673 Router::new()
674 .route("/rpc", get(handle_websocket_request))
675 .layer(
676 ServiceBuilder::new()
677 .layer(Extension(server.app_state.clone()))
678 .layer(middleware::from_fn(auth::validate_header)),
679 )
680 .route("/metrics", get(handle_metrics))
681 .layer(Extension(server))
682}
683
684pub async fn handle_websocket_request(
685 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
686 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
687 Extension(server): Extension<Arc<Server>>,
688 Extension(user): Extension<User>,
689 ws: WebSocketUpgrade,
690) -> axum::response::Response {
691 if protocol_version != rpc::PROTOCOL_VERSION {
692 return (
693 StatusCode::UPGRADE_REQUIRED,
694 "client must be upgraded".to_string(),
695 )
696 .into_response();
697 }
698 let socket_address = socket_address.to_string();
699 ws.on_upgrade(move |socket| {
700 use util::ResultExt;
701 let socket = socket
702 .map_ok(to_tungstenite_message)
703 .err_into()
704 .with(|message| async move { Ok(to_axum_message(message)) });
705 let connection = Connection::new(Box::pin(socket));
706 async move {
707 server
708 .handle_connection(connection, socket_address, user, None, Executor::Production)
709 .await
710 .log_err();
711 }
712 })
713}
714
715pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
716 let connections = server
717 .connection_pool
718 .lock()
719 .await
720 .connections()
721 .filter(|connection| !connection.admin)
722 .count();
723
724 METRIC_CONNECTIONS.set(connections as _);
725
726 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
727 METRIC_SHARED_PROJECTS.set(shared_projects as _);
728
729 let encoder = prometheus::TextEncoder::new();
730 let metric_families = prometheus::gather();
731 let encoded_metrics = encoder
732 .encode_to_string(&metric_families)
733 .map_err(|err| anyhow!("{}", err))?;
734 Ok(encoded_metrics)
735}
736
737#[instrument(err, skip(executor))]
738async fn sign_out(
739 session: Session,
740 mut teardown: watch::Receiver<()>,
741 executor: Executor,
742) -> Result<()> {
743 session.peer.disconnect(session.connection_id);
744 session
745 .connection_pool()
746 .await
747 .remove_connection(session.connection_id)?;
748
749 if let Some(mut left_projects) = session
750 .db()
751 .await
752 .connection_lost(session.connection_id)
753 .await
754 .trace_err()
755 {
756 for left_project in mem::take(&mut *left_projects) {
757 project_left(&left_project, &session);
758 }
759 }
760
761 futures::select_biased! {
762 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
763 leave_room_for_session(&session).await.trace_err();
764
765 if !session
766 .connection_pool()
767 .await
768 .is_user_online(session.user_id)
769 {
770 let db = session.db().await;
771 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
772 room_updated(&room, &session.peer);
773 }
774 }
775 update_user_contacts(session.user_id, &session).await?;
776 }
777 _ = teardown.changed().fuse() => {}
778 }
779
780 Ok(())
781}
782
783async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
784 response.send(proto::Ack {})?;
785 Ok(())
786}
787
788async fn create_room(
789 _request: proto::CreateRoom,
790 response: Response<proto::CreateRoom>,
791 session: Session,
792) -> Result<()> {
793 let live_kit_room = nanoid::nanoid!(30);
794 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
795 if let Some(_) = live_kit
796 .create_room(live_kit_room.clone())
797 .await
798 .trace_err()
799 {
800 if let Some(token) = live_kit
801 .room_token(&live_kit_room, &session.connection_id.to_string())
802 .trace_err()
803 {
804 Some(proto::LiveKitConnectionInfo {
805 server_url: live_kit.url().into(),
806 token,
807 })
808 } else {
809 None
810 }
811 } else {
812 None
813 }
814 } else {
815 None
816 };
817
818 {
819 let room = session
820 .db()
821 .await
822 .create_room(session.user_id, session.connection_id, &live_kit_room)
823 .await?;
824
825 response.send(proto::CreateRoomResponse {
826 room: Some(room.clone()),
827 live_kit_connection_info,
828 })?;
829 }
830
831 update_user_contacts(session.user_id, &session).await?;
832 Ok(())
833}
834
835async fn join_room(
836 request: proto::JoinRoom,
837 response: Response<proto::JoinRoom>,
838 session: Session,
839) -> Result<()> {
840 let room = {
841 let room = session
842 .db()
843 .await
844 .join_room(
845 RoomId::from_proto(request.id),
846 session.user_id,
847 session.connection_id,
848 )
849 .await?;
850 room_updated(&room, &session.peer);
851 room.clone()
852 };
853
854 for connection_id in session
855 .connection_pool()
856 .await
857 .user_connection_ids(session.user_id)
858 {
859 session
860 .peer
861 .send(connection_id, proto::CallCanceled {})
862 .trace_err();
863 }
864
865 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
866 if let Some(token) = live_kit
867 .room_token(&room.live_kit_room, &session.connection_id.to_string())
868 .trace_err()
869 {
870 Some(proto::LiveKitConnectionInfo {
871 server_url: live_kit.url().into(),
872 token,
873 })
874 } else {
875 None
876 }
877 } else {
878 None
879 };
880
881 response.send(proto::JoinRoomResponse {
882 room: Some(room),
883 live_kit_connection_info,
884 })?;
885
886 update_user_contacts(session.user_id, &session).await?;
887 Ok(())
888}
889
890async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
891 leave_room_for_session(&session).await
892}
893
894async fn call(
895 request: proto::Call,
896 response: Response<proto::Call>,
897 session: Session,
898) -> Result<()> {
899 let room_id = RoomId::from_proto(request.room_id);
900 let calling_user_id = session.user_id;
901 let calling_connection_id = session.connection_id;
902 let called_user_id = UserId::from_proto(request.called_user_id);
903 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
904 if !session
905 .db()
906 .await
907 .has_contact(calling_user_id, called_user_id)
908 .await?
909 {
910 return Err(anyhow!("cannot call a user who isn't a contact"))?;
911 }
912
913 let incoming_call = {
914 let (room, incoming_call) = &mut *session
915 .db()
916 .await
917 .call(
918 room_id,
919 calling_user_id,
920 calling_connection_id,
921 called_user_id,
922 initial_project_id,
923 )
924 .await?;
925 room_updated(&room, &session.peer);
926 mem::take(incoming_call)
927 };
928 update_user_contacts(called_user_id, &session).await?;
929
930 let mut calls = session
931 .connection_pool()
932 .await
933 .user_connection_ids(called_user_id)
934 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
935 .collect::<FuturesUnordered<_>>();
936
937 while let Some(call_response) = calls.next().await {
938 match call_response.as_ref() {
939 Ok(_) => {
940 response.send(proto::Ack {})?;
941 return Ok(());
942 }
943 Err(_) => {
944 call_response.trace_err();
945 }
946 }
947 }
948
949 {
950 let room = session
951 .db()
952 .await
953 .call_failed(room_id, called_user_id)
954 .await?;
955 room_updated(&room, &session.peer);
956 }
957 update_user_contacts(called_user_id, &session).await?;
958
959 Err(anyhow!("failed to ring user"))?
960}
961
962async fn cancel_call(
963 request: proto::CancelCall,
964 response: Response<proto::CancelCall>,
965 session: Session,
966) -> Result<()> {
967 let called_user_id = UserId::from_proto(request.called_user_id);
968 let room_id = RoomId::from_proto(request.room_id);
969 {
970 let room = session
971 .db()
972 .await
973 .cancel_call(Some(room_id), session.connection_id, called_user_id)
974 .await?;
975 room_updated(&room, &session.peer);
976 }
977
978 for connection_id in session
979 .connection_pool()
980 .await
981 .user_connection_ids(called_user_id)
982 {
983 session
984 .peer
985 .send(connection_id, proto::CallCanceled {})
986 .trace_err();
987 }
988 response.send(proto::Ack {})?;
989
990 update_user_contacts(called_user_id, &session).await?;
991 Ok(())
992}
993
994async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
995 let room_id = RoomId::from_proto(message.room_id);
996 {
997 let room = session
998 .db()
999 .await
1000 .decline_call(Some(room_id), session.user_id)
1001 .await?;
1002 room_updated(&room, &session.peer);
1003 }
1004
1005 for connection_id in session
1006 .connection_pool()
1007 .await
1008 .user_connection_ids(session.user_id)
1009 {
1010 session
1011 .peer
1012 .send(connection_id, proto::CallCanceled {})
1013 .trace_err();
1014 }
1015 update_user_contacts(session.user_id, &session).await?;
1016 Ok(())
1017}
1018
1019async fn update_participant_location(
1020 request: proto::UpdateParticipantLocation,
1021 response: Response<proto::UpdateParticipantLocation>,
1022 session: Session,
1023) -> Result<()> {
1024 let room_id = RoomId::from_proto(request.room_id);
1025 let location = request
1026 .location
1027 .ok_or_else(|| anyhow!("invalid location"))?;
1028 let room = session
1029 .db()
1030 .await
1031 .update_room_participant_location(room_id, session.connection_id, location)
1032 .await?;
1033 room_updated(&room, &session.peer);
1034 response.send(proto::Ack {})?;
1035 Ok(())
1036}
1037
1038async fn share_project(
1039 request: proto::ShareProject,
1040 response: Response<proto::ShareProject>,
1041 session: Session,
1042) -> Result<()> {
1043 let (project_id, room) = &*session
1044 .db()
1045 .await
1046 .share_project(
1047 RoomId::from_proto(request.room_id),
1048 session.connection_id,
1049 &request.worktrees,
1050 )
1051 .await?;
1052 response.send(proto::ShareProjectResponse {
1053 project_id: project_id.to_proto(),
1054 })?;
1055 room_updated(&room, &session.peer);
1056
1057 Ok(())
1058}
1059
1060async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1061 let project_id = ProjectId::from_proto(message.project_id);
1062
1063 let (room, guest_connection_ids) = &*session
1064 .db()
1065 .await
1066 .unshare_project(project_id, session.connection_id)
1067 .await?;
1068
1069 broadcast(
1070 session.connection_id,
1071 guest_connection_ids.iter().copied(),
1072 |conn_id| session.peer.send(conn_id, message.clone()),
1073 );
1074 room_updated(&room, &session.peer);
1075
1076 Ok(())
1077}
1078
1079async fn join_project(
1080 request: proto::JoinProject,
1081 response: Response<proto::JoinProject>,
1082 session: Session,
1083) -> Result<()> {
1084 let project_id = ProjectId::from_proto(request.project_id);
1085 let guest_user_id = session.user_id;
1086
1087 tracing::info!(%project_id, "join project");
1088
1089 let (project, replica_id) = &mut *session
1090 .db()
1091 .await
1092 .join_project(project_id, session.connection_id)
1093 .await?;
1094
1095 let collaborators = project
1096 .collaborators
1097 .iter()
1098 .filter(|collaborator| collaborator.connection_id != session.connection_id.0 as i32)
1099 .map(|collaborator| proto::Collaborator {
1100 peer_id: collaborator.connection_id as u32,
1101 replica_id: collaborator.replica_id.0 as u32,
1102 user_id: collaborator.user_id.to_proto(),
1103 })
1104 .collect::<Vec<_>>();
1105 let worktrees = project
1106 .worktrees
1107 .iter()
1108 .map(|(id, worktree)| proto::WorktreeMetadata {
1109 id: *id,
1110 root_name: worktree.root_name.clone(),
1111 visible: worktree.visible,
1112 abs_path: worktree.abs_path.clone(),
1113 })
1114 .collect::<Vec<_>>();
1115
1116 for collaborator in &collaborators {
1117 session
1118 .peer
1119 .send(
1120 ConnectionId(collaborator.peer_id),
1121 proto::AddProjectCollaborator {
1122 project_id: project_id.to_proto(),
1123 collaborator: Some(proto::Collaborator {
1124 peer_id: session.connection_id.0,
1125 replica_id: replica_id.0 as u32,
1126 user_id: guest_user_id.to_proto(),
1127 }),
1128 },
1129 )
1130 .trace_err();
1131 }
1132
1133 // First, we send the metadata associated with each worktree.
1134 response.send(proto::JoinProjectResponse {
1135 worktrees: worktrees.clone(),
1136 replica_id: replica_id.0 as u32,
1137 collaborators: collaborators.clone(),
1138 language_servers: project.language_servers.clone(),
1139 })?;
1140
1141 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1142 #[cfg(any(test, feature = "test-support"))]
1143 const MAX_CHUNK_SIZE: usize = 2;
1144 #[cfg(not(any(test, feature = "test-support")))]
1145 const MAX_CHUNK_SIZE: usize = 256;
1146
1147 // Stream this worktree's entries.
1148 let message = proto::UpdateWorktree {
1149 project_id: project_id.to_proto(),
1150 worktree_id,
1151 abs_path: worktree.abs_path.clone(),
1152 root_name: worktree.root_name,
1153 updated_entries: worktree.entries,
1154 removed_entries: Default::default(),
1155 scan_id: worktree.scan_id,
1156 is_last_update: worktree.is_complete,
1157 };
1158 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1159 session.peer.send(session.connection_id, update.clone())?;
1160 }
1161
1162 // Stream this worktree's diagnostics.
1163 for summary in worktree.diagnostic_summaries {
1164 session.peer.send(
1165 session.connection_id,
1166 proto::UpdateDiagnosticSummary {
1167 project_id: project_id.to_proto(),
1168 worktree_id: worktree.id,
1169 summary: Some(summary),
1170 },
1171 )?;
1172 }
1173 }
1174
1175 for language_server in &project.language_servers {
1176 session.peer.send(
1177 session.connection_id,
1178 proto::UpdateLanguageServer {
1179 project_id: project_id.to_proto(),
1180 language_server_id: language_server.id,
1181 variant: Some(
1182 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1183 proto::LspDiskBasedDiagnosticsUpdated {},
1184 ),
1185 ),
1186 },
1187 )?;
1188 }
1189
1190 Ok(())
1191}
1192
1193async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1194 let sender_id = session.connection_id;
1195 let project_id = ProjectId::from_proto(request.project_id);
1196
1197 let project = session
1198 .db()
1199 .await
1200 .leave_project(project_id, sender_id)
1201 .await?;
1202 tracing::info!(
1203 %project_id,
1204 host_user_id = %project.host_user_id,
1205 host_connection_id = %project.host_connection_id,
1206 "leave project"
1207 );
1208 project_left(&project, &session);
1209
1210 Ok(())
1211}
1212
1213async fn update_project(
1214 request: proto::UpdateProject,
1215 response: Response<proto::UpdateProject>,
1216 session: Session,
1217) -> Result<()> {
1218 let project_id = ProjectId::from_proto(request.project_id);
1219 let (room, guest_connection_ids) = &*session
1220 .db()
1221 .await
1222 .update_project(project_id, session.connection_id, &request.worktrees)
1223 .await?;
1224 broadcast(
1225 session.connection_id,
1226 guest_connection_ids.iter().copied(),
1227 |connection_id| {
1228 session
1229 .peer
1230 .forward_send(session.connection_id, connection_id, request.clone())
1231 },
1232 );
1233 room_updated(&room, &session.peer);
1234 response.send(proto::Ack {})?;
1235
1236 Ok(())
1237}
1238
1239async fn update_worktree(
1240 request: proto::UpdateWorktree,
1241 response: Response<proto::UpdateWorktree>,
1242 session: Session,
1243) -> Result<()> {
1244 let guest_connection_ids = session
1245 .db()
1246 .await
1247 .update_worktree(&request, session.connection_id)
1248 .await?;
1249
1250 broadcast(
1251 session.connection_id,
1252 guest_connection_ids.iter().copied(),
1253 |connection_id| {
1254 session
1255 .peer
1256 .forward_send(session.connection_id, connection_id, request.clone())
1257 },
1258 );
1259 response.send(proto::Ack {})?;
1260 Ok(())
1261}
1262
1263async fn update_diagnostic_summary(
1264 message: proto::UpdateDiagnosticSummary,
1265 session: Session,
1266) -> Result<()> {
1267 let guest_connection_ids = session
1268 .db()
1269 .await
1270 .update_diagnostic_summary(&message, session.connection_id)
1271 .await?;
1272
1273 broadcast(
1274 session.connection_id,
1275 guest_connection_ids.iter().copied(),
1276 |connection_id| {
1277 session
1278 .peer
1279 .forward_send(session.connection_id, connection_id, message.clone())
1280 },
1281 );
1282
1283 Ok(())
1284}
1285
1286async fn start_language_server(
1287 request: proto::StartLanguageServer,
1288 session: Session,
1289) -> Result<()> {
1290 let guest_connection_ids = session
1291 .db()
1292 .await
1293 .start_language_server(&request, session.connection_id)
1294 .await?;
1295
1296 broadcast(
1297 session.connection_id,
1298 guest_connection_ids.iter().copied(),
1299 |connection_id| {
1300 session
1301 .peer
1302 .forward_send(session.connection_id, connection_id, request.clone())
1303 },
1304 );
1305 Ok(())
1306}
1307
1308async fn update_language_server(
1309 request: proto::UpdateLanguageServer,
1310 session: Session,
1311) -> Result<()> {
1312 let project_id = ProjectId::from_proto(request.project_id);
1313 let project_connection_ids = session
1314 .db()
1315 .await
1316 .project_connection_ids(project_id, session.connection_id)
1317 .await?;
1318 broadcast(
1319 session.connection_id,
1320 project_connection_ids.iter().copied(),
1321 |connection_id| {
1322 session
1323 .peer
1324 .forward_send(session.connection_id, connection_id, request.clone())
1325 },
1326 );
1327 Ok(())
1328}
1329
1330async fn forward_project_request<T>(
1331 request: T,
1332 response: Response<T>,
1333 session: Session,
1334) -> Result<()>
1335where
1336 T: EntityMessage + RequestMessage,
1337{
1338 let project_id = ProjectId::from_proto(request.remote_entity_id());
1339 let host_connection_id = {
1340 let collaborators = session
1341 .db()
1342 .await
1343 .project_collaborators(project_id, session.connection_id)
1344 .await?;
1345 ConnectionId(
1346 collaborators
1347 .iter()
1348 .find(|collaborator| collaborator.is_host)
1349 .ok_or_else(|| anyhow!("host not found"))?
1350 .connection_id as u32,
1351 )
1352 };
1353
1354 let payload = session
1355 .peer
1356 .forward_request(session.connection_id, host_connection_id, request)
1357 .await?;
1358
1359 response.send(payload)?;
1360 Ok(())
1361}
1362
1363async fn save_buffer(
1364 request: proto::SaveBuffer,
1365 response: Response<proto::SaveBuffer>,
1366 session: Session,
1367) -> Result<()> {
1368 let project_id = ProjectId::from_proto(request.project_id);
1369 let host_connection_id = {
1370 let collaborators = session
1371 .db()
1372 .await
1373 .project_collaborators(project_id, session.connection_id)
1374 .await?;
1375 let host = collaborators
1376 .iter()
1377 .find(|collaborator| collaborator.is_host)
1378 .ok_or_else(|| anyhow!("host not found"))?;
1379 ConnectionId(host.connection_id as u32)
1380 };
1381 let response_payload = session
1382 .peer
1383 .forward_request(session.connection_id, host_connection_id, request.clone())
1384 .await?;
1385
1386 let mut collaborators = session
1387 .db()
1388 .await
1389 .project_collaborators(project_id, session.connection_id)
1390 .await?;
1391 collaborators
1392 .retain(|collaborator| collaborator.connection_id != session.connection_id.0 as i32);
1393 let project_connection_ids = collaborators
1394 .iter()
1395 .map(|collaborator| ConnectionId(collaborator.connection_id as u32));
1396 broadcast(host_connection_id, project_connection_ids, |conn_id| {
1397 session
1398 .peer
1399 .forward_send(host_connection_id, conn_id, response_payload.clone())
1400 });
1401 response.send(response_payload)?;
1402 Ok(())
1403}
1404
1405async fn create_buffer_for_peer(
1406 request: proto::CreateBufferForPeer,
1407 session: Session,
1408) -> Result<()> {
1409 session.peer.forward_send(
1410 session.connection_id,
1411 ConnectionId(request.peer_id),
1412 request,
1413 )?;
1414 Ok(())
1415}
1416
1417async fn update_buffer(
1418 request: proto::UpdateBuffer,
1419 response: Response<proto::UpdateBuffer>,
1420 session: Session,
1421) -> Result<()> {
1422 let project_id = ProjectId::from_proto(request.project_id);
1423 let project_connection_ids = session
1424 .db()
1425 .await
1426 .project_connection_ids(project_id, session.connection_id)
1427 .await?;
1428
1429 broadcast(
1430 session.connection_id,
1431 project_connection_ids.iter().copied(),
1432 |connection_id| {
1433 session
1434 .peer
1435 .forward_send(session.connection_id, connection_id, request.clone())
1436 },
1437 );
1438 response.send(proto::Ack {})?;
1439 Ok(())
1440}
1441
1442async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1443 let project_id = ProjectId::from_proto(request.project_id);
1444 let project_connection_ids = session
1445 .db()
1446 .await
1447 .project_connection_ids(project_id, session.connection_id)
1448 .await?;
1449
1450 broadcast(
1451 session.connection_id,
1452 project_connection_ids.iter().copied(),
1453 |connection_id| {
1454 session
1455 .peer
1456 .forward_send(session.connection_id, connection_id, request.clone())
1457 },
1458 );
1459 Ok(())
1460}
1461
1462async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1463 let project_id = ProjectId::from_proto(request.project_id);
1464 let project_connection_ids = session
1465 .db()
1466 .await
1467 .project_connection_ids(project_id, session.connection_id)
1468 .await?;
1469 broadcast(
1470 session.connection_id,
1471 project_connection_ids.iter().copied(),
1472 |connection_id| {
1473 session
1474 .peer
1475 .forward_send(session.connection_id, connection_id, request.clone())
1476 },
1477 );
1478 Ok(())
1479}
1480
1481async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1482 let project_id = ProjectId::from_proto(request.project_id);
1483 let project_connection_ids = session
1484 .db()
1485 .await
1486 .project_connection_ids(project_id, session.connection_id)
1487 .await?;
1488 broadcast(
1489 session.connection_id,
1490 project_connection_ids.iter().copied(),
1491 |connection_id| {
1492 session
1493 .peer
1494 .forward_send(session.connection_id, connection_id, request.clone())
1495 },
1496 );
1497 Ok(())
1498}
1499
1500async fn follow(
1501 request: proto::Follow,
1502 response: Response<proto::Follow>,
1503 session: Session,
1504) -> Result<()> {
1505 let project_id = ProjectId::from_proto(request.project_id);
1506 let leader_id = ConnectionId(request.leader_id);
1507 let follower_id = session.connection_id;
1508 {
1509 let project_connection_ids = session
1510 .db()
1511 .await
1512 .project_connection_ids(project_id, session.connection_id)
1513 .await?;
1514
1515 if !project_connection_ids.contains(&leader_id) {
1516 Err(anyhow!("no such peer"))?;
1517 }
1518 }
1519
1520 let mut response_payload = session
1521 .peer
1522 .forward_request(session.connection_id, leader_id, request)
1523 .await?;
1524 response_payload
1525 .views
1526 .retain(|view| view.leader_id != Some(follower_id.0));
1527 response.send(response_payload)?;
1528 Ok(())
1529}
1530
1531async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1532 let project_id = ProjectId::from_proto(request.project_id);
1533 let leader_id = ConnectionId(request.leader_id);
1534 let project_connection_ids = session
1535 .db()
1536 .await
1537 .project_connection_ids(project_id, session.connection_id)
1538 .await?;
1539 if !project_connection_ids.contains(&leader_id) {
1540 Err(anyhow!("no such peer"))?;
1541 }
1542 session
1543 .peer
1544 .forward_send(session.connection_id, leader_id, request)?;
1545 Ok(())
1546}
1547
1548async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1549 let project_id = ProjectId::from_proto(request.project_id);
1550 let project_connection_ids = session
1551 .db
1552 .lock()
1553 .await
1554 .project_connection_ids(project_id, session.connection_id)
1555 .await?;
1556
1557 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1558 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1559 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1560 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1561 });
1562 for follower_id in &request.follower_ids {
1563 let follower_id = ConnectionId(*follower_id);
1564 if project_connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1565 session
1566 .peer
1567 .forward_send(session.connection_id, follower_id, request.clone())?;
1568 }
1569 }
1570 Ok(())
1571}
1572
1573async fn get_users(
1574 request: proto::GetUsers,
1575 response: Response<proto::GetUsers>,
1576 session: Session,
1577) -> Result<()> {
1578 let user_ids = request
1579 .user_ids
1580 .into_iter()
1581 .map(UserId::from_proto)
1582 .collect();
1583 let users = session
1584 .db()
1585 .await
1586 .get_users_by_ids(user_ids)
1587 .await?
1588 .into_iter()
1589 .map(|user| proto::User {
1590 id: user.id.to_proto(),
1591 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1592 github_login: user.github_login,
1593 })
1594 .collect();
1595 response.send(proto::UsersResponse { users })?;
1596 Ok(())
1597}
1598
1599async fn fuzzy_search_users(
1600 request: proto::FuzzySearchUsers,
1601 response: Response<proto::FuzzySearchUsers>,
1602 session: Session,
1603) -> Result<()> {
1604 let query = request.query;
1605 let users = match query.len() {
1606 0 => vec![],
1607 1 | 2 => session
1608 .db()
1609 .await
1610 .get_user_by_github_account(&query, None)
1611 .await?
1612 .into_iter()
1613 .collect(),
1614 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1615 };
1616 let users = users
1617 .into_iter()
1618 .filter(|user| user.id != session.user_id)
1619 .map(|user| proto::User {
1620 id: user.id.to_proto(),
1621 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1622 github_login: user.github_login,
1623 })
1624 .collect();
1625 response.send(proto::UsersResponse { users })?;
1626 Ok(())
1627}
1628
1629async fn request_contact(
1630 request: proto::RequestContact,
1631 response: Response<proto::RequestContact>,
1632 session: Session,
1633) -> Result<()> {
1634 let requester_id = session.user_id;
1635 let responder_id = UserId::from_proto(request.responder_id);
1636 if requester_id == responder_id {
1637 return Err(anyhow!("cannot add yourself as a contact"))?;
1638 }
1639
1640 session
1641 .db()
1642 .await
1643 .send_contact_request(requester_id, responder_id)
1644 .await?;
1645
1646 // Update outgoing contact requests of requester
1647 let mut update = proto::UpdateContacts::default();
1648 update.outgoing_requests.push(responder_id.to_proto());
1649 for connection_id in session
1650 .connection_pool()
1651 .await
1652 .user_connection_ids(requester_id)
1653 {
1654 session.peer.send(connection_id, update.clone())?;
1655 }
1656
1657 // Update incoming contact requests of responder
1658 let mut update = proto::UpdateContacts::default();
1659 update
1660 .incoming_requests
1661 .push(proto::IncomingContactRequest {
1662 requester_id: requester_id.to_proto(),
1663 should_notify: true,
1664 });
1665 for connection_id in session
1666 .connection_pool()
1667 .await
1668 .user_connection_ids(responder_id)
1669 {
1670 session.peer.send(connection_id, update.clone())?;
1671 }
1672
1673 response.send(proto::Ack {})?;
1674 Ok(())
1675}
1676
1677async fn respond_to_contact_request(
1678 request: proto::RespondToContactRequest,
1679 response: Response<proto::RespondToContactRequest>,
1680 session: Session,
1681) -> Result<()> {
1682 let responder_id = session.user_id;
1683 let requester_id = UserId::from_proto(request.requester_id);
1684 let db = session.db().await;
1685 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1686 db.dismiss_contact_notification(responder_id, requester_id)
1687 .await?;
1688 } else {
1689 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1690
1691 db.respond_to_contact_request(responder_id, requester_id, accept)
1692 .await?;
1693 let requester_busy = db.is_user_busy(requester_id).await?;
1694 let responder_busy = db.is_user_busy(responder_id).await?;
1695
1696 let pool = session.connection_pool().await;
1697 // Update responder with new contact
1698 let mut update = proto::UpdateContacts::default();
1699 if accept {
1700 update
1701 .contacts
1702 .push(contact_for_user(requester_id, false, requester_busy, &pool));
1703 }
1704 update
1705 .remove_incoming_requests
1706 .push(requester_id.to_proto());
1707 for connection_id in pool.user_connection_ids(responder_id) {
1708 session.peer.send(connection_id, update.clone())?;
1709 }
1710
1711 // Update requester with new contact
1712 let mut update = proto::UpdateContacts::default();
1713 if accept {
1714 update
1715 .contacts
1716 .push(contact_for_user(responder_id, true, responder_busy, &pool));
1717 }
1718 update
1719 .remove_outgoing_requests
1720 .push(responder_id.to_proto());
1721 for connection_id in pool.user_connection_ids(requester_id) {
1722 session.peer.send(connection_id, update.clone())?;
1723 }
1724 }
1725
1726 response.send(proto::Ack {})?;
1727 Ok(())
1728}
1729
1730async fn remove_contact(
1731 request: proto::RemoveContact,
1732 response: Response<proto::RemoveContact>,
1733 session: Session,
1734) -> Result<()> {
1735 let requester_id = session.user_id;
1736 let responder_id = UserId::from_proto(request.user_id);
1737 let db = session.db().await;
1738 db.remove_contact(requester_id, responder_id).await?;
1739
1740 let pool = session.connection_pool().await;
1741 // Update outgoing contact requests of requester
1742 let mut update = proto::UpdateContacts::default();
1743 update
1744 .remove_outgoing_requests
1745 .push(responder_id.to_proto());
1746 for connection_id in pool.user_connection_ids(requester_id) {
1747 session.peer.send(connection_id, update.clone())?;
1748 }
1749
1750 // Update incoming contact requests of responder
1751 let mut update = proto::UpdateContacts::default();
1752 update
1753 .remove_incoming_requests
1754 .push(requester_id.to_proto());
1755 for connection_id in pool.user_connection_ids(responder_id) {
1756 session.peer.send(connection_id, update.clone())?;
1757 }
1758
1759 response.send(proto::Ack {})?;
1760 Ok(())
1761}
1762
1763async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
1764 let project_id = ProjectId::from_proto(request.project_id);
1765 let project_connection_ids = session
1766 .db()
1767 .await
1768 .project_connection_ids(project_id, session.connection_id)
1769 .await?;
1770 broadcast(
1771 session.connection_id,
1772 project_connection_ids.iter().copied(),
1773 |connection_id| {
1774 session
1775 .peer
1776 .forward_send(session.connection_id, connection_id, request.clone())
1777 },
1778 );
1779 Ok(())
1780}
1781
1782async fn get_private_user_info(
1783 _request: proto::GetPrivateUserInfo,
1784 response: Response<proto::GetPrivateUserInfo>,
1785 session: Session,
1786) -> Result<()> {
1787 let metrics_id = session
1788 .db()
1789 .await
1790 .get_user_metrics_id(session.user_id)
1791 .await?;
1792 let user = session
1793 .db()
1794 .await
1795 .get_user_by_id(session.user_id)
1796 .await?
1797 .ok_or_else(|| anyhow!("user not found"))?;
1798 response.send(proto::GetPrivateUserInfoResponse {
1799 metrics_id,
1800 staff: user.admin,
1801 })?;
1802 Ok(())
1803}
1804
1805fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1806 match message {
1807 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1808 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1809 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1810 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1811 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1812 code: frame.code.into(),
1813 reason: frame.reason,
1814 })),
1815 }
1816}
1817
1818fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1819 match message {
1820 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1821 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1822 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1823 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1824 AxumMessage::Close(frame) => {
1825 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1826 code: frame.code.into(),
1827 reason: frame.reason,
1828 }))
1829 }
1830 }
1831}
1832
1833fn build_initial_contacts_update(
1834 contacts: Vec<db::Contact>,
1835 pool: &ConnectionPool,
1836) -> proto::UpdateContacts {
1837 let mut update = proto::UpdateContacts::default();
1838
1839 for contact in contacts {
1840 match contact {
1841 db::Contact::Accepted {
1842 user_id,
1843 should_notify,
1844 busy,
1845 } => {
1846 update
1847 .contacts
1848 .push(contact_for_user(user_id, should_notify, busy, &pool));
1849 }
1850 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
1851 db::Contact::Incoming {
1852 user_id,
1853 should_notify,
1854 } => update
1855 .incoming_requests
1856 .push(proto::IncomingContactRequest {
1857 requester_id: user_id.to_proto(),
1858 should_notify,
1859 }),
1860 }
1861 }
1862
1863 update
1864}
1865
1866fn contact_for_user(
1867 user_id: UserId,
1868 should_notify: bool,
1869 busy: bool,
1870 pool: &ConnectionPool,
1871) -> proto::Contact {
1872 proto::Contact {
1873 user_id: user_id.to_proto(),
1874 online: pool.is_user_online(user_id),
1875 busy,
1876 should_notify,
1877 }
1878}
1879
1880fn room_updated(room: &proto::Room, peer: &Peer) {
1881 for participant in &room.participants {
1882 peer.send(
1883 ConnectionId(participant.peer_id),
1884 proto::RoomUpdated {
1885 room: Some(room.clone()),
1886 },
1887 )
1888 .trace_err();
1889 }
1890}
1891
1892async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
1893 let db = session.db().await;
1894 let contacts = db.get_contacts(user_id).await?;
1895 let busy = db.is_user_busy(user_id).await?;
1896
1897 let pool = session.connection_pool().await;
1898 let updated_contact = contact_for_user(user_id, false, busy, &pool);
1899 for contact in contacts {
1900 if let db::Contact::Accepted {
1901 user_id: contact_user_id,
1902 ..
1903 } = contact
1904 {
1905 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
1906 session
1907 .peer
1908 .send(
1909 contact_conn_id,
1910 proto::UpdateContacts {
1911 contacts: vec![updated_contact.clone()],
1912 remove_contacts: Default::default(),
1913 incoming_requests: Default::default(),
1914 remove_incoming_requests: Default::default(),
1915 outgoing_requests: Default::default(),
1916 remove_outgoing_requests: Default::default(),
1917 },
1918 )
1919 .trace_err();
1920 }
1921 }
1922 }
1923 Ok(())
1924}
1925
1926async fn leave_room_for_session(session: &Session) -> Result<()> {
1927 let mut contacts_to_update = HashSet::default();
1928
1929 let canceled_calls_to_user_ids;
1930 let live_kit_room;
1931 let delete_live_kit_room;
1932 {
1933 let mut left_room = session.db().await.leave_room(session.connection_id).await?;
1934 contacts_to_update.insert(session.user_id);
1935
1936 for project in left_room.left_projects.values() {
1937 project_left(project, session);
1938 }
1939
1940 room_updated(&left_room.room, &session.peer);
1941 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
1942 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
1943 delete_live_kit_room = left_room.room.participants.is_empty();
1944 }
1945
1946 {
1947 let pool = session.connection_pool().await;
1948 for canceled_user_id in canceled_calls_to_user_ids {
1949 for connection_id in pool.user_connection_ids(canceled_user_id) {
1950 session
1951 .peer
1952 .send(connection_id, proto::CallCanceled {})
1953 .trace_err();
1954 }
1955 contacts_to_update.insert(canceled_user_id);
1956 }
1957 }
1958
1959 for contact_user_id in contacts_to_update {
1960 update_user_contacts(contact_user_id, &session).await?;
1961 }
1962
1963 if let Some(live_kit) = session.live_kit_client.as_ref() {
1964 live_kit
1965 .remove_participant(live_kit_room.clone(), session.connection_id.to_string())
1966 .await
1967 .trace_err();
1968
1969 if delete_live_kit_room {
1970 live_kit.delete_room(live_kit_room).await.trace_err();
1971 }
1972 }
1973
1974 Ok(())
1975}
1976
1977fn project_left(project: &db::LeftProject, session: &Session) {
1978 for connection_id in &project.connection_ids {
1979 if project.host_user_id == session.user_id {
1980 session
1981 .peer
1982 .send(
1983 *connection_id,
1984 proto::UnshareProject {
1985 project_id: project.id.to_proto(),
1986 },
1987 )
1988 .trace_err();
1989 } else {
1990 session
1991 .peer
1992 .send(
1993 *connection_id,
1994 proto::RemoveProjectCollaborator {
1995 project_id: project.id.to_proto(),
1996 peer_id: session.connection_id.0,
1997 },
1998 )
1999 .trace_err();
2000 }
2001 }
2002
2003 session
2004 .peer
2005 .send(
2006 session.connection_id,
2007 proto::UnshareProject {
2008 project_id: project.id.to_proto(),
2009 },
2010 )
2011 .trace_err();
2012}
2013
2014pub trait ResultExt {
2015 type Ok;
2016
2017 fn trace_err(self) -> Option<Self::Ok>;
2018}
2019
2020impl<T, E> ResultExt for Result<T, E>
2021where
2022 E: std::fmt::Debug,
2023{
2024 type Ok = T;
2025
2026 fn trace_err(self) -> Option<T> {
2027 match self {
2028 Ok(value) => Some(value),
2029 Err(error) => {
2030 tracing::error!("{:?}", error);
2031 None
2032 }
2033 }
2034 }
2035}