1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
6 executor::Executor,
7 AppState, Result,
8};
9use anyhow::anyhow;
10use async_tungstenite::tungstenite::{
11 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
12};
13use axum::{
14 body::Body,
15 extract::{
16 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
17 ConnectInfo, WebSocketUpgrade,
18 },
19 headers::{Header, HeaderName},
20 http::StatusCode,
21 middleware,
22 response::IntoResponse,
23 routing::get,
24 Extension, Router, TypedHeader,
25};
26use collections::{HashMap, HashSet};
27pub use connection_pool::ConnectionPool;
28use futures::{
29 channel::oneshot,
30 future::{self, BoxFuture},
31 stream::FuturesUnordered,
32 FutureExt, SinkExt, StreamExt, TryStreamExt,
33};
34use lazy_static::lazy_static;
35use prometheus::{register_int_gauge, IntGauge};
36use rpc::{
37 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
38 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
39};
40use serde::{Serialize, Serializer};
41use std::{
42 any::TypeId,
43 fmt,
44 future::Future,
45 marker::PhantomData,
46 mem,
47 net::SocketAddr,
48 ops::{Deref, DerefMut},
49 rc::Rc,
50 sync::{
51 atomic::{AtomicBool, Ordering::SeqCst},
52 Arc,
53 },
54 time::{Duration, Instant},
55};
56use tokio::sync::{watch, Semaphore};
57use tower::ServiceBuilder;
58use tracing::{info_span, instrument, Instrument};
59
60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
62
63lazy_static! {
64 static ref METRIC_CONNECTIONS: IntGauge =
65 register_int_gauge!("connections", "number of connections").unwrap();
66 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
67 "shared_projects",
68 "number of open projects with one or more guests"
69 )
70 .unwrap();
71}
72
73type MessageHandler =
74 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
75
76struct Response<R> {
77 peer: Arc<Peer>,
78 receipt: Receipt<R>,
79 responded: Arc<AtomicBool>,
80}
81
82impl<R: RequestMessage> Response<R> {
83 fn send(self, payload: R::Response) -> Result<()> {
84 self.responded.store(true, SeqCst);
85 self.peer.respond(self.receipt, payload)?;
86 Ok(())
87 }
88}
89
90#[derive(Clone)]
91struct Session {
92 user_id: UserId,
93 connection_id: ConnectionId,
94 db: Arc<tokio::sync::Mutex<DbHandle>>,
95 peer: Arc<Peer>,
96 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
97 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
98 executor: Executor,
99}
100
101impl Session {
102 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
103 #[cfg(test)]
104 tokio::task::yield_now().await;
105 let guard = self.db.lock().await;
106 #[cfg(test)]
107 tokio::task::yield_now().await;
108 guard
109 }
110
111 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
112 #[cfg(test)]
113 tokio::task::yield_now().await;
114 let guard = self.connection_pool.lock();
115 ConnectionPoolGuard {
116 guard,
117 _not_send: PhantomData,
118 }
119 }
120}
121
122impl fmt::Debug for Session {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.debug_struct("Session")
125 .field("user_id", &self.user_id)
126 .field("connection_id", &self.connection_id)
127 .finish()
128 }
129}
130
131struct DbHandle(Arc<Database>);
132
133impl Deref for DbHandle {
134 type Target = Database;
135
136 fn deref(&self) -> &Self::Target {
137 self.0.as_ref()
138 }
139}
140
141pub struct Server {
142 id: parking_lot::Mutex<ServerId>,
143 peer: Arc<Peer>,
144 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
145 app_state: Arc<AppState>,
146 executor: Executor,
147 handlers: HashMap<TypeId, MessageHandler>,
148 teardown: watch::Sender<()>,
149}
150
151pub(crate) struct ConnectionPoolGuard<'a> {
152 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
153 _not_send: PhantomData<Rc<()>>,
154}
155
156#[derive(Serialize)]
157pub struct ServerSnapshot<'a> {
158 peer: &'a Peer,
159 #[serde(serialize_with = "serialize_deref")]
160 connection_pool: ConnectionPoolGuard<'a>,
161}
162
163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
164where
165 S: Serializer,
166 T: Deref<Target = U>,
167 U: Serialize,
168{
169 Serialize::serialize(value.deref(), serializer)
170}
171
172impl Server {
173 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
174 let mut server = Self {
175 id: parking_lot::Mutex::new(id),
176 peer: Peer::new(id.0 as u32),
177 app_state,
178 executor,
179 connection_pool: Default::default(),
180 handlers: Default::default(),
181 teardown: watch::channel(()).0,
182 };
183
184 server
185 .add_request_handler(ping)
186 .add_request_handler(create_room)
187 .add_request_handler(join_room)
188 .add_request_handler(rejoin_room)
189 .add_request_handler(leave_room)
190 .add_request_handler(call)
191 .add_request_handler(cancel_call)
192 .add_message_handler(decline_call)
193 .add_request_handler(update_participant_location)
194 .add_request_handler(share_project)
195 .add_message_handler(unshare_project)
196 .add_request_handler(join_project)
197 .add_message_handler(leave_project)
198 .add_request_handler(update_project)
199 .add_request_handler(update_worktree)
200 .add_message_handler(start_language_server)
201 .add_message_handler(update_language_server)
202 .add_message_handler(update_diagnostic_summary)
203 .add_message_handler(update_worktree_settings)
204 .add_message_handler(refresh_inlay_hints)
205 .add_request_handler(forward_project_request::<proto::GetHover>)
206 .add_request_handler(forward_project_request::<proto::GetDefinition>)
207 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
208 .add_request_handler(forward_project_request::<proto::GetReferences>)
209 .add_request_handler(forward_project_request::<proto::SearchProject>)
210 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
211 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
212 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
213 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
214 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
215 .add_request_handler(forward_project_request::<proto::GetCompletions>)
216 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
217 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
218 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
219 .add_request_handler(forward_project_request::<proto::PrepareRename>)
220 .add_request_handler(forward_project_request::<proto::PerformRename>)
221 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
222 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
223 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
224 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
225 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
226 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
227 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
228 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
229 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
230 .add_request_handler(forward_project_request::<proto::InlayHints>)
231 .add_message_handler(create_buffer_for_peer)
232 .add_request_handler(update_buffer)
233 .add_message_handler(update_buffer_file)
234 .add_message_handler(buffer_reloaded)
235 .add_message_handler(buffer_saved)
236 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
237 .add_request_handler(get_users)
238 .add_request_handler(fuzzy_search_users)
239 .add_request_handler(request_contact)
240 .add_request_handler(remove_contact)
241 .add_request_handler(respond_to_contact_request)
242 .add_request_handler(follow)
243 .add_message_handler(unfollow)
244 .add_message_handler(update_followers)
245 .add_message_handler(update_diff_base)
246 .add_request_handler(get_private_user_info);
247
248 Arc::new(server)
249 }
250
251 pub async fn start(&self) -> Result<()> {
252 let server_id = *self.id.lock();
253 let app_state = self.app_state.clone();
254 let peer = self.peer.clone();
255 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
256 let pool = self.connection_pool.clone();
257 let live_kit_client = self.app_state.live_kit_client.clone();
258
259 let span = info_span!("start server");
260 self.executor.spawn_detached(
261 async move {
262 tracing::info!("waiting for cleanup timeout");
263 timeout.await;
264 tracing::info!("cleanup timeout expired, retrieving stale rooms");
265 if let Some(room_ids) = app_state
266 .db
267 .stale_room_ids(&app_state.config.zed_environment, server_id)
268 .await
269 .trace_err()
270 {
271 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
272 for room_id in room_ids {
273 let mut contacts_to_update = HashSet::default();
274 let mut canceled_calls_to_user_ids = Vec::new();
275 let mut live_kit_room = String::new();
276 let mut delete_live_kit_room = false;
277
278 if let Some(mut refreshed_room) = app_state
279 .db
280 .refresh_room(room_id, server_id)
281 .await
282 .trace_err()
283 {
284 tracing::info!(
285 room_id = room_id.0,
286 new_participant_count = refreshed_room.room.participants.len(),
287 "refreshed room"
288 );
289 room_updated(&refreshed_room.room, &peer);
290 contacts_to_update
291 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
292 contacts_to_update
293 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
294 canceled_calls_to_user_ids =
295 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
296 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
297 delete_live_kit_room = refreshed_room.room.participants.is_empty();
298 }
299
300 {
301 let pool = pool.lock();
302 for canceled_user_id in canceled_calls_to_user_ids {
303 for connection_id in pool.user_connection_ids(canceled_user_id) {
304 peer.send(
305 connection_id,
306 proto::CallCanceled {
307 room_id: room_id.to_proto(),
308 },
309 )
310 .trace_err();
311 }
312 }
313 }
314
315 for user_id in contacts_to_update {
316 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
317 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
318 if let Some((busy, contacts)) = busy.zip(contacts) {
319 let pool = pool.lock();
320 let updated_contact = contact_for_user(user_id, false, busy, &pool);
321 for contact in contacts {
322 if let db::Contact::Accepted {
323 user_id: contact_user_id,
324 ..
325 } = contact
326 {
327 for contact_conn_id in
328 pool.user_connection_ids(contact_user_id)
329 {
330 peer.send(
331 contact_conn_id,
332 proto::UpdateContacts {
333 contacts: vec![updated_contact.clone()],
334 remove_contacts: Default::default(),
335 incoming_requests: Default::default(),
336 remove_incoming_requests: Default::default(),
337 outgoing_requests: Default::default(),
338 remove_outgoing_requests: Default::default(),
339 },
340 )
341 .trace_err();
342 }
343 }
344 }
345 }
346 }
347
348 if let Some(live_kit) = live_kit_client.as_ref() {
349 if delete_live_kit_room {
350 live_kit.delete_room(live_kit_room).await.trace_err();
351 }
352 }
353 }
354 }
355
356 app_state
357 .db
358 .delete_stale_servers(&app_state.config.zed_environment, server_id)
359 .await
360 .trace_err();
361 }
362 .instrument(span),
363 );
364 Ok(())
365 }
366
367 pub fn teardown(&self) {
368 self.peer.teardown();
369 self.connection_pool.lock().reset();
370 let _ = self.teardown.send(());
371 }
372
373 #[cfg(test)]
374 pub fn reset(&self, id: ServerId) {
375 self.teardown();
376 *self.id.lock() = id;
377 self.peer.reset(id.0 as u32);
378 }
379
380 #[cfg(test)]
381 pub fn id(&self) -> ServerId {
382 *self.id.lock()
383 }
384
385 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
386 where
387 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
388 Fut: 'static + Send + Future<Output = Result<()>>,
389 M: EnvelopedMessage,
390 {
391 let prev_handler = self.handlers.insert(
392 TypeId::of::<M>(),
393 Box::new(move |envelope, session| {
394 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
395 let span = info_span!(
396 "handle message",
397 payload_type = envelope.payload_type_name()
398 );
399 span.in_scope(|| {
400 tracing::info!(
401 payload_type = envelope.payload_type_name(),
402 "message received"
403 );
404 });
405 let start_time = Instant::now();
406 let future = (handler)(*envelope, session);
407 async move {
408 let result = future.await;
409 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
410 match result {
411 Err(error) => {
412 tracing::error!(%error, ?duration_ms, "error handling message")
413 }
414 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
415 }
416 }
417 .instrument(span)
418 .boxed()
419 }),
420 );
421 if prev_handler.is_some() {
422 panic!("registered a handler for the same message twice");
423 }
424 self
425 }
426
427 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
428 where
429 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
430 Fut: 'static + Send + Future<Output = Result<()>>,
431 M: EnvelopedMessage,
432 {
433 self.add_handler(move |envelope, session| handler(envelope.payload, session));
434 self
435 }
436
437 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
438 where
439 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
440 Fut: Send + Future<Output = Result<()>>,
441 M: RequestMessage,
442 {
443 let handler = Arc::new(handler);
444 self.add_handler(move |envelope, session| {
445 let receipt = envelope.receipt();
446 let handler = handler.clone();
447 async move {
448 let peer = session.peer.clone();
449 let responded = Arc::new(AtomicBool::default());
450 let response = Response {
451 peer: peer.clone(),
452 responded: responded.clone(),
453 receipt,
454 };
455 match (handler)(envelope.payload, response, session).await {
456 Ok(()) => {
457 if responded.load(std::sync::atomic::Ordering::SeqCst) {
458 Ok(())
459 } else {
460 Err(anyhow!("handler did not send a response"))?
461 }
462 }
463 Err(error) => {
464 peer.respond_with_error(
465 receipt,
466 proto::Error {
467 message: error.to_string(),
468 },
469 )?;
470 Err(error)
471 }
472 }
473 }
474 })
475 }
476
477 pub fn handle_connection(
478 self: &Arc<Self>,
479 connection: Connection,
480 address: String,
481 user: User,
482 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
483 executor: Executor,
484 ) -> impl Future<Output = Result<()>> {
485 let this = self.clone();
486 let user_id = user.id;
487 let login = user.github_login;
488 let span = info_span!("handle connection", %user_id, %login, %address);
489 let mut teardown = self.teardown.subscribe();
490 async move {
491 let (connection_id, handle_io, mut incoming_rx) = this
492 .peer
493 .add_connection(connection, {
494 let executor = executor.clone();
495 move |duration| executor.sleep(duration)
496 });
497
498 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
499 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
500 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
501
502 if let Some(send_connection_id) = send_connection_id.take() {
503 let _ = send_connection_id.send(connection_id);
504 }
505
506 if !user.connected_once {
507 this.peer.send(connection_id, proto::ShowContacts {})?;
508 this.app_state.db.set_user_connected_once(user_id, true).await?;
509 }
510
511 let (contacts, invite_code) = future::try_join(
512 this.app_state.db.get_contacts(user_id),
513 this.app_state.db.get_invite_code_for_user(user_id)
514 ).await?;
515
516 {
517 let mut pool = this.connection_pool.lock();
518 pool.add_connection(connection_id, user_id, user.admin);
519 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
520
521 if let Some((code, count)) = invite_code {
522 this.peer.send(connection_id, proto::UpdateInviteInfo {
523 url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
524 count: count as u32,
525 })?;
526 }
527 }
528
529 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
530 this.peer.send(connection_id, incoming_call)?;
531 }
532
533 let session = Session {
534 user_id,
535 connection_id,
536 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
537 peer: this.peer.clone(),
538 connection_pool: this.connection_pool.clone(),
539 live_kit_client: this.app_state.live_kit_client.clone(),
540 executor: executor.clone(),
541 };
542 update_user_contacts(user_id, &session).await?;
543
544 let handle_io = handle_io.fuse();
545 futures::pin_mut!(handle_io);
546
547 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
548 // This prevents deadlocks when e.g., client A performs a request to client B and
549 // client B performs a request to client A. If both clients stop processing further
550 // messages until their respective request completes, they won't have a chance to
551 // respond to the other client's request and cause a deadlock.
552 //
553 // This arrangement ensures we will attempt to process earlier messages first, but fall
554 // back to processing messages arrived later in the spirit of making progress.
555 let mut foreground_message_handlers = FuturesUnordered::new();
556 let concurrent_handlers = Arc::new(Semaphore::new(256));
557 loop {
558 let next_message = async {
559 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
560 let message = incoming_rx.next().await;
561 (permit, message)
562 }.fuse();
563 futures::pin_mut!(next_message);
564 futures::select_biased! {
565 _ = teardown.changed().fuse() => return Ok(()),
566 result = handle_io => {
567 if let Err(error) = result {
568 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
569 }
570 break;
571 }
572 _ = foreground_message_handlers.next() => {}
573 next_message = next_message => {
574 let (permit, message) = next_message;
575 if let Some(message) = message {
576 let type_name = message.payload_type_name();
577 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
578 let span_enter = span.enter();
579 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
580 let is_background = message.is_background();
581 let handle_message = (handler)(message, session.clone());
582 drop(span_enter);
583
584 let handle_message = async move {
585 handle_message.await;
586 drop(permit);
587 }.instrument(span);
588 if is_background {
589 executor.spawn_detached(handle_message);
590 } else {
591 foreground_message_handlers.push(handle_message);
592 }
593 } else {
594 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
595 }
596 } else {
597 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
598 break;
599 }
600 }
601 }
602 }
603
604 drop(foreground_message_handlers);
605 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
606 if let Err(error) = connection_lost(session, teardown, executor).await {
607 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
608 }
609
610 Ok(())
611 }.instrument(span)
612 }
613
614 pub async fn invite_code_redeemed(
615 self: &Arc<Self>,
616 inviter_id: UserId,
617 invitee_id: UserId,
618 ) -> Result<()> {
619 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
620 if let Some(code) = &user.invite_code {
621 let pool = self.connection_pool.lock();
622 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
623 for connection_id in pool.user_connection_ids(inviter_id) {
624 self.peer.send(
625 connection_id,
626 proto::UpdateContacts {
627 contacts: vec![invitee_contact.clone()],
628 ..Default::default()
629 },
630 )?;
631 self.peer.send(
632 connection_id,
633 proto::UpdateInviteInfo {
634 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
635 count: user.invite_count as u32,
636 },
637 )?;
638 }
639 }
640 }
641 Ok(())
642 }
643
644 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
645 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
646 if let Some(invite_code) = &user.invite_code {
647 let pool = self.connection_pool.lock();
648 for connection_id in pool.user_connection_ids(user_id) {
649 self.peer.send(
650 connection_id,
651 proto::UpdateInviteInfo {
652 url: format!(
653 "{}{}",
654 self.app_state.config.invite_link_prefix, invite_code
655 ),
656 count: user.invite_count as u32,
657 },
658 )?;
659 }
660 }
661 }
662 Ok(())
663 }
664
665 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
666 ServerSnapshot {
667 connection_pool: ConnectionPoolGuard {
668 guard: self.connection_pool.lock(),
669 _not_send: PhantomData,
670 },
671 peer: &self.peer,
672 }
673 }
674}
675
676impl<'a> Deref for ConnectionPoolGuard<'a> {
677 type Target = ConnectionPool;
678
679 fn deref(&self) -> &Self::Target {
680 &*self.guard
681 }
682}
683
684impl<'a> DerefMut for ConnectionPoolGuard<'a> {
685 fn deref_mut(&mut self) -> &mut Self::Target {
686 &mut *self.guard
687 }
688}
689
690impl<'a> Drop for ConnectionPoolGuard<'a> {
691 fn drop(&mut self) {
692 #[cfg(test)]
693 self.check_invariants();
694 }
695}
696
697fn broadcast<F>(
698 sender_id: Option<ConnectionId>,
699 receiver_ids: impl IntoIterator<Item = ConnectionId>,
700 mut f: F,
701) where
702 F: FnMut(ConnectionId) -> anyhow::Result<()>,
703{
704 for receiver_id in receiver_ids {
705 if Some(receiver_id) != sender_id {
706 if let Err(error) = f(receiver_id) {
707 tracing::error!("failed to send to {:?} {}", receiver_id, error);
708 }
709 }
710 }
711}
712
713lazy_static! {
714 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
715}
716
717pub struct ProtocolVersion(u32);
718
719impl Header for ProtocolVersion {
720 fn name() -> &'static HeaderName {
721 &ZED_PROTOCOL_VERSION
722 }
723
724 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
725 where
726 Self: Sized,
727 I: Iterator<Item = &'i axum::http::HeaderValue>,
728 {
729 let version = values
730 .next()
731 .ok_or_else(axum::headers::Error::invalid)?
732 .to_str()
733 .map_err(|_| axum::headers::Error::invalid())?
734 .parse()
735 .map_err(|_| axum::headers::Error::invalid())?;
736 Ok(Self(version))
737 }
738
739 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
740 values.extend([self.0.to_string().parse().unwrap()]);
741 }
742}
743
744pub fn routes(server: Arc<Server>) -> Router<Body> {
745 Router::new()
746 .route("/rpc", get(handle_websocket_request))
747 .layer(
748 ServiceBuilder::new()
749 .layer(Extension(server.app_state.clone()))
750 .layer(middleware::from_fn(auth::validate_header)),
751 )
752 .route("/metrics", get(handle_metrics))
753 .layer(Extension(server))
754}
755
756pub async fn handle_websocket_request(
757 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
758 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
759 Extension(server): Extension<Arc<Server>>,
760 Extension(user): Extension<User>,
761 ws: WebSocketUpgrade,
762) -> axum::response::Response {
763 if protocol_version != rpc::PROTOCOL_VERSION {
764 return (
765 StatusCode::UPGRADE_REQUIRED,
766 "client must be upgraded".to_string(),
767 )
768 .into_response();
769 }
770 let socket_address = socket_address.to_string();
771 ws.on_upgrade(move |socket| {
772 use util::ResultExt;
773 let socket = socket
774 .map_ok(to_tungstenite_message)
775 .err_into()
776 .with(|message| async move { Ok(to_axum_message(message)) });
777 let connection = Connection::new(Box::pin(socket));
778 async move {
779 server
780 .handle_connection(connection, socket_address, user, None, Executor::Production)
781 .await
782 .log_err();
783 }
784 })
785}
786
787pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
788 let connections = server
789 .connection_pool
790 .lock()
791 .connections()
792 .filter(|connection| !connection.admin)
793 .count();
794
795 METRIC_CONNECTIONS.set(connections as _);
796
797 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
798 METRIC_SHARED_PROJECTS.set(shared_projects as _);
799
800 let encoder = prometheus::TextEncoder::new();
801 let metric_families = prometheus::gather();
802 let encoded_metrics = encoder
803 .encode_to_string(&metric_families)
804 .map_err(|err| anyhow!("{}", err))?;
805 Ok(encoded_metrics)
806}
807
808#[instrument(err, skip(executor))]
809async fn connection_lost(
810 session: Session,
811 mut teardown: watch::Receiver<()>,
812 executor: Executor,
813) -> Result<()> {
814 session.peer.disconnect(session.connection_id);
815 session
816 .connection_pool()
817 .await
818 .remove_connection(session.connection_id)?;
819
820 session
821 .db()
822 .await
823 .connection_lost(session.connection_id)
824 .await
825 .trace_err();
826
827 futures::select_biased! {
828 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
829 leave_room_for_session(&session).await.trace_err();
830
831 if !session
832 .connection_pool()
833 .await
834 .is_user_online(session.user_id)
835 {
836 let db = session.db().await;
837 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
838 room_updated(&room, &session.peer);
839 }
840 }
841 update_user_contacts(session.user_id, &session).await?;
842 }
843 _ = teardown.changed().fuse() => {}
844 }
845
846 Ok(())
847}
848
849async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
850 response.send(proto::Ack {})?;
851 Ok(())
852}
853
854async fn create_room(
855 _request: proto::CreateRoom,
856 response: Response<proto::CreateRoom>,
857 session: Session,
858) -> Result<()> {
859 let live_kit_room = nanoid::nanoid!(30);
860 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
861 if let Some(_) = live_kit
862 .create_room(live_kit_room.clone())
863 .await
864 .trace_err()
865 {
866 if let Some(token) = live_kit
867 .room_token(&live_kit_room, &session.user_id.to_string())
868 .trace_err()
869 {
870 Some(proto::LiveKitConnectionInfo {
871 server_url: live_kit.url().into(),
872 token,
873 })
874 } else {
875 None
876 }
877 } else {
878 None
879 }
880 } else {
881 None
882 };
883
884 {
885 let room = session
886 .db()
887 .await
888 .create_room(session.user_id, session.connection_id, &live_kit_room)
889 .await?;
890
891 response.send(proto::CreateRoomResponse {
892 room: Some(room.clone()),
893 live_kit_connection_info,
894 })?;
895 }
896
897 update_user_contacts(session.user_id, &session).await?;
898 Ok(())
899}
900
901async fn join_room(
902 request: proto::JoinRoom,
903 response: Response<proto::JoinRoom>,
904 session: Session,
905) -> Result<()> {
906 let room_id = RoomId::from_proto(request.id);
907 let room = {
908 let room = session
909 .db()
910 .await
911 .join_room(room_id, session.user_id, session.connection_id)
912 .await?;
913 room_updated(&room, &session.peer);
914 room.clone()
915 };
916
917 for connection_id in session
918 .connection_pool()
919 .await
920 .user_connection_ids(session.user_id)
921 {
922 session
923 .peer
924 .send(
925 connection_id,
926 proto::CallCanceled {
927 room_id: room_id.to_proto(),
928 },
929 )
930 .trace_err();
931 }
932
933 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
934 if let Some(token) = live_kit
935 .room_token(&room.live_kit_room, &session.user_id.to_string())
936 .trace_err()
937 {
938 Some(proto::LiveKitConnectionInfo {
939 server_url: live_kit.url().into(),
940 token,
941 })
942 } else {
943 None
944 }
945 } else {
946 None
947 };
948
949 response.send(proto::JoinRoomResponse {
950 room: Some(room),
951 live_kit_connection_info,
952 })?;
953
954 update_user_contacts(session.user_id, &session).await?;
955 Ok(())
956}
957
958async fn rejoin_room(
959 request: proto::RejoinRoom,
960 response: Response<proto::RejoinRoom>,
961 session: Session,
962) -> Result<()> {
963 {
964 let mut rejoined_room = session
965 .db()
966 .await
967 .rejoin_room(request, session.user_id, session.connection_id)
968 .await?;
969
970 response.send(proto::RejoinRoomResponse {
971 room: Some(rejoined_room.room.clone()),
972 reshared_projects: rejoined_room
973 .reshared_projects
974 .iter()
975 .map(|project| proto::ResharedProject {
976 id: project.id.to_proto(),
977 collaborators: project
978 .collaborators
979 .iter()
980 .map(|collaborator| collaborator.to_proto())
981 .collect(),
982 })
983 .collect(),
984 rejoined_projects: rejoined_room
985 .rejoined_projects
986 .iter()
987 .map(|rejoined_project| proto::RejoinedProject {
988 id: rejoined_project.id.to_proto(),
989 worktrees: rejoined_project
990 .worktrees
991 .iter()
992 .map(|worktree| proto::WorktreeMetadata {
993 id: worktree.id,
994 root_name: worktree.root_name.clone(),
995 visible: worktree.visible,
996 abs_path: worktree.abs_path.clone(),
997 })
998 .collect(),
999 collaborators: rejoined_project
1000 .collaborators
1001 .iter()
1002 .map(|collaborator| collaborator.to_proto())
1003 .collect(),
1004 language_servers: rejoined_project.language_servers.clone(),
1005 })
1006 .collect(),
1007 })?;
1008 room_updated(&rejoined_room.room, &session.peer);
1009
1010 for project in &rejoined_room.reshared_projects {
1011 for collaborator in &project.collaborators {
1012 session
1013 .peer
1014 .send(
1015 collaborator.connection_id,
1016 proto::UpdateProjectCollaborator {
1017 project_id: project.id.to_proto(),
1018 old_peer_id: Some(project.old_connection_id.into()),
1019 new_peer_id: Some(session.connection_id.into()),
1020 },
1021 )
1022 .trace_err();
1023 }
1024
1025 broadcast(
1026 Some(session.connection_id),
1027 project
1028 .collaborators
1029 .iter()
1030 .map(|collaborator| collaborator.connection_id),
1031 |connection_id| {
1032 session.peer.forward_send(
1033 session.connection_id,
1034 connection_id,
1035 proto::UpdateProject {
1036 project_id: project.id.to_proto(),
1037 worktrees: project.worktrees.clone(),
1038 },
1039 )
1040 },
1041 );
1042 }
1043
1044 for project in &rejoined_room.rejoined_projects {
1045 for collaborator in &project.collaborators {
1046 session
1047 .peer
1048 .send(
1049 collaborator.connection_id,
1050 proto::UpdateProjectCollaborator {
1051 project_id: project.id.to_proto(),
1052 old_peer_id: Some(project.old_connection_id.into()),
1053 new_peer_id: Some(session.connection_id.into()),
1054 },
1055 )
1056 .trace_err();
1057 }
1058 }
1059
1060 for project in &mut rejoined_room.rejoined_projects {
1061 for worktree in mem::take(&mut project.worktrees) {
1062 #[cfg(any(test, feature = "test-support"))]
1063 const MAX_CHUNK_SIZE: usize = 2;
1064 #[cfg(not(any(test, feature = "test-support")))]
1065 const MAX_CHUNK_SIZE: usize = 256;
1066
1067 // Stream this worktree's entries.
1068 let message = proto::UpdateWorktree {
1069 project_id: project.id.to_proto(),
1070 worktree_id: worktree.id,
1071 abs_path: worktree.abs_path.clone(),
1072 root_name: worktree.root_name,
1073 updated_entries: worktree.updated_entries,
1074 removed_entries: worktree.removed_entries,
1075 scan_id: worktree.scan_id,
1076 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1077 updated_repositories: worktree.updated_repositories,
1078 removed_repositories: worktree.removed_repositories,
1079 };
1080 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1081 session.peer.send(session.connection_id, update.clone())?;
1082 }
1083
1084 // Stream this worktree's diagnostics.
1085 for summary in worktree.diagnostic_summaries {
1086 session.peer.send(
1087 session.connection_id,
1088 proto::UpdateDiagnosticSummary {
1089 project_id: project.id.to_proto(),
1090 worktree_id: worktree.id,
1091 summary: Some(summary),
1092 },
1093 )?;
1094 }
1095
1096 for settings_file in worktree.settings_files {
1097 session.peer.send(
1098 session.connection_id,
1099 proto::UpdateWorktreeSettings {
1100 project_id: project.id.to_proto(),
1101 worktree_id: worktree.id,
1102 path: settings_file.path,
1103 content: Some(settings_file.content),
1104 },
1105 )?;
1106 }
1107 }
1108
1109 for language_server in &project.language_servers {
1110 session.peer.send(
1111 session.connection_id,
1112 proto::UpdateLanguageServer {
1113 project_id: project.id.to_proto(),
1114 language_server_id: language_server.id,
1115 variant: Some(
1116 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1117 proto::LspDiskBasedDiagnosticsUpdated {},
1118 ),
1119 ),
1120 },
1121 )?;
1122 }
1123 }
1124 }
1125
1126 update_user_contacts(session.user_id, &session).await?;
1127 Ok(())
1128}
1129
1130async fn leave_room(
1131 _: proto::LeaveRoom,
1132 response: Response<proto::LeaveRoom>,
1133 session: Session,
1134) -> Result<()> {
1135 leave_room_for_session(&session).await?;
1136 response.send(proto::Ack {})?;
1137 Ok(())
1138}
1139
1140async fn call(
1141 request: proto::Call,
1142 response: Response<proto::Call>,
1143 session: Session,
1144) -> Result<()> {
1145 let room_id = RoomId::from_proto(request.room_id);
1146 let calling_user_id = session.user_id;
1147 let calling_connection_id = session.connection_id;
1148 let called_user_id = UserId::from_proto(request.called_user_id);
1149 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1150 if !session
1151 .db()
1152 .await
1153 .has_contact(calling_user_id, called_user_id)
1154 .await?
1155 {
1156 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1157 }
1158
1159 let incoming_call = {
1160 let (room, incoming_call) = &mut *session
1161 .db()
1162 .await
1163 .call(
1164 room_id,
1165 calling_user_id,
1166 calling_connection_id,
1167 called_user_id,
1168 initial_project_id,
1169 )
1170 .await?;
1171 room_updated(&room, &session.peer);
1172 mem::take(incoming_call)
1173 };
1174 update_user_contacts(called_user_id, &session).await?;
1175
1176 let mut calls = session
1177 .connection_pool()
1178 .await
1179 .user_connection_ids(called_user_id)
1180 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1181 .collect::<FuturesUnordered<_>>();
1182
1183 while let Some(call_response) = calls.next().await {
1184 match call_response.as_ref() {
1185 Ok(_) => {
1186 response.send(proto::Ack {})?;
1187 return Ok(());
1188 }
1189 Err(_) => {
1190 call_response.trace_err();
1191 }
1192 }
1193 }
1194
1195 {
1196 let room = session
1197 .db()
1198 .await
1199 .call_failed(room_id, called_user_id)
1200 .await?;
1201 room_updated(&room, &session.peer);
1202 }
1203 update_user_contacts(called_user_id, &session).await?;
1204
1205 Err(anyhow!("failed to ring user"))?
1206}
1207
1208async fn cancel_call(
1209 request: proto::CancelCall,
1210 response: Response<proto::CancelCall>,
1211 session: Session,
1212) -> Result<()> {
1213 let called_user_id = UserId::from_proto(request.called_user_id);
1214 let room_id = RoomId::from_proto(request.room_id);
1215 {
1216 let room = session
1217 .db()
1218 .await
1219 .cancel_call(room_id, session.connection_id, called_user_id)
1220 .await?;
1221 room_updated(&room, &session.peer);
1222 }
1223
1224 for connection_id in session
1225 .connection_pool()
1226 .await
1227 .user_connection_ids(called_user_id)
1228 {
1229 session
1230 .peer
1231 .send(
1232 connection_id,
1233 proto::CallCanceled {
1234 room_id: room_id.to_proto(),
1235 },
1236 )
1237 .trace_err();
1238 }
1239 response.send(proto::Ack {})?;
1240
1241 update_user_contacts(called_user_id, &session).await?;
1242 Ok(())
1243}
1244
1245async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1246 let room_id = RoomId::from_proto(message.room_id);
1247 {
1248 let room = session
1249 .db()
1250 .await
1251 .decline_call(Some(room_id), session.user_id)
1252 .await?
1253 .ok_or_else(|| anyhow!("failed to decline call"))?;
1254 room_updated(&room, &session.peer);
1255 }
1256
1257 for connection_id in session
1258 .connection_pool()
1259 .await
1260 .user_connection_ids(session.user_id)
1261 {
1262 session
1263 .peer
1264 .send(
1265 connection_id,
1266 proto::CallCanceled {
1267 room_id: room_id.to_proto(),
1268 },
1269 )
1270 .trace_err();
1271 }
1272 update_user_contacts(session.user_id, &session).await?;
1273 Ok(())
1274}
1275
1276async fn update_participant_location(
1277 request: proto::UpdateParticipantLocation,
1278 response: Response<proto::UpdateParticipantLocation>,
1279 session: Session,
1280) -> Result<()> {
1281 let room_id = RoomId::from_proto(request.room_id);
1282 let location = request
1283 .location
1284 .ok_or_else(|| anyhow!("invalid location"))?;
1285 let room = session
1286 .db()
1287 .await
1288 .update_room_participant_location(room_id, session.connection_id, location)
1289 .await?;
1290 room_updated(&room, &session.peer);
1291 response.send(proto::Ack {})?;
1292 Ok(())
1293}
1294
1295async fn share_project(
1296 request: proto::ShareProject,
1297 response: Response<proto::ShareProject>,
1298 session: Session,
1299) -> Result<()> {
1300 let (project_id, room) = &*session
1301 .db()
1302 .await
1303 .share_project(
1304 RoomId::from_proto(request.room_id),
1305 session.connection_id,
1306 &request.worktrees,
1307 )
1308 .await?;
1309 response.send(proto::ShareProjectResponse {
1310 project_id: project_id.to_proto(),
1311 })?;
1312 room_updated(&room, &session.peer);
1313
1314 Ok(())
1315}
1316
1317async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1318 let project_id = ProjectId::from_proto(message.project_id);
1319
1320 let (room, guest_connection_ids) = &*session
1321 .db()
1322 .await
1323 .unshare_project(project_id, session.connection_id)
1324 .await?;
1325
1326 broadcast(
1327 Some(session.connection_id),
1328 guest_connection_ids.iter().copied(),
1329 |conn_id| session.peer.send(conn_id, message.clone()),
1330 );
1331 room_updated(&room, &session.peer);
1332
1333 Ok(())
1334}
1335
1336async fn join_project(
1337 request: proto::JoinProject,
1338 response: Response<proto::JoinProject>,
1339 session: Session,
1340) -> Result<()> {
1341 let project_id = ProjectId::from_proto(request.project_id);
1342 let guest_user_id = session.user_id;
1343
1344 tracing::info!(%project_id, "join project");
1345
1346 let (project, replica_id) = &mut *session
1347 .db()
1348 .await
1349 .join_project(project_id, session.connection_id)
1350 .await?;
1351
1352 let collaborators = project
1353 .collaborators
1354 .iter()
1355 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1356 .map(|collaborator| collaborator.to_proto())
1357 .collect::<Vec<_>>();
1358
1359 let worktrees = project
1360 .worktrees
1361 .iter()
1362 .map(|(id, worktree)| proto::WorktreeMetadata {
1363 id: *id,
1364 root_name: worktree.root_name.clone(),
1365 visible: worktree.visible,
1366 abs_path: worktree.abs_path.clone(),
1367 })
1368 .collect::<Vec<_>>();
1369
1370 for collaborator in &collaborators {
1371 session
1372 .peer
1373 .send(
1374 collaborator.peer_id.unwrap().into(),
1375 proto::AddProjectCollaborator {
1376 project_id: project_id.to_proto(),
1377 collaborator: Some(proto::Collaborator {
1378 peer_id: Some(session.connection_id.into()),
1379 replica_id: replica_id.0 as u32,
1380 user_id: guest_user_id.to_proto(),
1381 }),
1382 },
1383 )
1384 .trace_err();
1385 }
1386
1387 // First, we send the metadata associated with each worktree.
1388 response.send(proto::JoinProjectResponse {
1389 worktrees: worktrees.clone(),
1390 replica_id: replica_id.0 as u32,
1391 collaborators: collaborators.clone(),
1392 language_servers: project.language_servers.clone(),
1393 })?;
1394
1395 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1396 #[cfg(any(test, feature = "test-support"))]
1397 const MAX_CHUNK_SIZE: usize = 2;
1398 #[cfg(not(any(test, feature = "test-support")))]
1399 const MAX_CHUNK_SIZE: usize = 256;
1400
1401 // Stream this worktree's entries.
1402 let message = proto::UpdateWorktree {
1403 project_id: project_id.to_proto(),
1404 worktree_id,
1405 abs_path: worktree.abs_path.clone(),
1406 root_name: worktree.root_name,
1407 updated_entries: worktree.entries,
1408 removed_entries: Default::default(),
1409 scan_id: worktree.scan_id,
1410 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1411 updated_repositories: worktree.repository_entries.into_values().collect(),
1412 removed_repositories: Default::default(),
1413 };
1414 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1415 session.peer.send(session.connection_id, update.clone())?;
1416 }
1417
1418 // Stream this worktree's diagnostics.
1419 for summary in worktree.diagnostic_summaries {
1420 session.peer.send(
1421 session.connection_id,
1422 proto::UpdateDiagnosticSummary {
1423 project_id: project_id.to_proto(),
1424 worktree_id: worktree.id,
1425 summary: Some(summary),
1426 },
1427 )?;
1428 }
1429
1430 for settings_file in worktree.settings_files {
1431 session.peer.send(
1432 session.connection_id,
1433 proto::UpdateWorktreeSettings {
1434 project_id: project_id.to_proto(),
1435 worktree_id: worktree.id,
1436 path: settings_file.path,
1437 content: Some(settings_file.content),
1438 },
1439 )?;
1440 }
1441 }
1442
1443 for language_server in &project.language_servers {
1444 session.peer.send(
1445 session.connection_id,
1446 proto::UpdateLanguageServer {
1447 project_id: project_id.to_proto(),
1448 language_server_id: language_server.id,
1449 variant: Some(
1450 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1451 proto::LspDiskBasedDiagnosticsUpdated {},
1452 ),
1453 ),
1454 },
1455 )?;
1456 }
1457
1458 Ok(())
1459}
1460
1461async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1462 let sender_id = session.connection_id;
1463 let project_id = ProjectId::from_proto(request.project_id);
1464
1465 let (room, project) = &*session
1466 .db()
1467 .await
1468 .leave_project(project_id, sender_id)
1469 .await?;
1470 tracing::info!(
1471 %project_id,
1472 host_user_id = %project.host_user_id,
1473 host_connection_id = %project.host_connection_id,
1474 "leave project"
1475 );
1476
1477 project_left(&project, &session);
1478 room_updated(&room, &session.peer);
1479
1480 Ok(())
1481}
1482
1483async fn update_project(
1484 request: proto::UpdateProject,
1485 response: Response<proto::UpdateProject>,
1486 session: Session,
1487) -> Result<()> {
1488 let project_id = ProjectId::from_proto(request.project_id);
1489 let (room, guest_connection_ids) = &*session
1490 .db()
1491 .await
1492 .update_project(project_id, session.connection_id, &request.worktrees)
1493 .await?;
1494 broadcast(
1495 Some(session.connection_id),
1496 guest_connection_ids.iter().copied(),
1497 |connection_id| {
1498 session
1499 .peer
1500 .forward_send(session.connection_id, connection_id, request.clone())
1501 },
1502 );
1503 room_updated(&room, &session.peer);
1504 response.send(proto::Ack {})?;
1505
1506 Ok(())
1507}
1508
1509async fn update_worktree(
1510 request: proto::UpdateWorktree,
1511 response: Response<proto::UpdateWorktree>,
1512 session: Session,
1513) -> Result<()> {
1514 let guest_connection_ids = session
1515 .db()
1516 .await
1517 .update_worktree(&request, session.connection_id)
1518 .await?;
1519
1520 broadcast(
1521 Some(session.connection_id),
1522 guest_connection_ids.iter().copied(),
1523 |connection_id| {
1524 session
1525 .peer
1526 .forward_send(session.connection_id, connection_id, request.clone())
1527 },
1528 );
1529 response.send(proto::Ack {})?;
1530 Ok(())
1531}
1532
1533async fn update_diagnostic_summary(
1534 message: proto::UpdateDiagnosticSummary,
1535 session: Session,
1536) -> Result<()> {
1537 let guest_connection_ids = session
1538 .db()
1539 .await
1540 .update_diagnostic_summary(&message, session.connection_id)
1541 .await?;
1542
1543 broadcast(
1544 Some(session.connection_id),
1545 guest_connection_ids.iter().copied(),
1546 |connection_id| {
1547 session
1548 .peer
1549 .forward_send(session.connection_id, connection_id, message.clone())
1550 },
1551 );
1552
1553 Ok(())
1554}
1555
1556async fn update_worktree_settings(
1557 message: proto::UpdateWorktreeSettings,
1558 session: Session,
1559) -> Result<()> {
1560 let guest_connection_ids = session
1561 .db()
1562 .await
1563 .update_worktree_settings(&message, session.connection_id)
1564 .await?;
1565
1566 broadcast(
1567 Some(session.connection_id),
1568 guest_connection_ids.iter().copied(),
1569 |connection_id| {
1570 session
1571 .peer
1572 .forward_send(session.connection_id, connection_id, message.clone())
1573 },
1574 );
1575
1576 Ok(())
1577}
1578
1579async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1580 broadcast_project_message(request.project_id, request, session).await
1581}
1582
1583async fn start_language_server(
1584 request: proto::StartLanguageServer,
1585 session: Session,
1586) -> Result<()> {
1587 let guest_connection_ids = session
1588 .db()
1589 .await
1590 .start_language_server(&request, session.connection_id)
1591 .await?;
1592
1593 broadcast(
1594 Some(session.connection_id),
1595 guest_connection_ids.iter().copied(),
1596 |connection_id| {
1597 session
1598 .peer
1599 .forward_send(session.connection_id, connection_id, request.clone())
1600 },
1601 );
1602 Ok(())
1603}
1604
1605async fn update_language_server(
1606 request: proto::UpdateLanguageServer,
1607 session: Session,
1608) -> Result<()> {
1609 session.executor.record_backtrace();
1610 let project_id = ProjectId::from_proto(request.project_id);
1611 let project_connection_ids = session
1612 .db()
1613 .await
1614 .project_connection_ids(project_id, session.connection_id)
1615 .await?;
1616 broadcast(
1617 Some(session.connection_id),
1618 project_connection_ids.iter().copied(),
1619 |connection_id| {
1620 session
1621 .peer
1622 .forward_send(session.connection_id, connection_id, request.clone())
1623 },
1624 );
1625 Ok(())
1626}
1627
1628async fn forward_project_request<T>(
1629 request: T,
1630 response: Response<T>,
1631 session: Session,
1632) -> Result<()>
1633where
1634 T: EntityMessage + RequestMessage,
1635{
1636 session.executor.record_backtrace();
1637 let project_id = ProjectId::from_proto(request.remote_entity_id());
1638 let host_connection_id = {
1639 let collaborators = session
1640 .db()
1641 .await
1642 .project_collaborators(project_id, session.connection_id)
1643 .await?;
1644 collaborators
1645 .iter()
1646 .find(|collaborator| collaborator.is_host)
1647 .ok_or_else(|| anyhow!("host not found"))?
1648 .connection_id
1649 };
1650
1651 let payload = session
1652 .peer
1653 .forward_request(session.connection_id, host_connection_id, request)
1654 .await?;
1655
1656 response.send(payload)?;
1657 Ok(())
1658}
1659
1660async fn create_buffer_for_peer(
1661 request: proto::CreateBufferForPeer,
1662 session: Session,
1663) -> Result<()> {
1664 session.executor.record_backtrace();
1665 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1666 session
1667 .peer
1668 .forward_send(session.connection_id, peer_id.into(), request)?;
1669 Ok(())
1670}
1671
1672async fn update_buffer(
1673 request: proto::UpdateBuffer,
1674 response: Response<proto::UpdateBuffer>,
1675 session: Session,
1676) -> Result<()> {
1677 session.executor.record_backtrace();
1678 let project_id = ProjectId::from_proto(request.project_id);
1679 let mut guest_connection_ids;
1680 let mut host_connection_id = None;
1681 {
1682 let collaborators = session
1683 .db()
1684 .await
1685 .project_collaborators(project_id, session.connection_id)
1686 .await?;
1687 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1688 for collaborator in collaborators.iter() {
1689 if collaborator.is_host {
1690 host_connection_id = Some(collaborator.connection_id);
1691 } else {
1692 guest_connection_ids.push(collaborator.connection_id);
1693 }
1694 }
1695 }
1696 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1697
1698 session.executor.record_backtrace();
1699 broadcast(
1700 Some(session.connection_id),
1701 guest_connection_ids,
1702 |connection_id| {
1703 session
1704 .peer
1705 .forward_send(session.connection_id, connection_id, request.clone())
1706 },
1707 );
1708 if host_connection_id != session.connection_id {
1709 session
1710 .peer
1711 .forward_request(session.connection_id, host_connection_id, request.clone())
1712 .await?;
1713 }
1714
1715 response.send(proto::Ack {})?;
1716 Ok(())
1717}
1718
1719async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1720 let project_id = ProjectId::from_proto(request.project_id);
1721 let project_connection_ids = session
1722 .db()
1723 .await
1724 .project_connection_ids(project_id, session.connection_id)
1725 .await?;
1726
1727 broadcast(
1728 Some(session.connection_id),
1729 project_connection_ids.iter().copied(),
1730 |connection_id| {
1731 session
1732 .peer
1733 .forward_send(session.connection_id, connection_id, request.clone())
1734 },
1735 );
1736 Ok(())
1737}
1738
1739async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1740 let project_id = ProjectId::from_proto(request.project_id);
1741 let project_connection_ids = session
1742 .db()
1743 .await
1744 .project_connection_ids(project_id, session.connection_id)
1745 .await?;
1746 broadcast(
1747 Some(session.connection_id),
1748 project_connection_ids.iter().copied(),
1749 |connection_id| {
1750 session
1751 .peer
1752 .forward_send(session.connection_id, connection_id, request.clone())
1753 },
1754 );
1755 Ok(())
1756}
1757
1758async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1759 broadcast_project_message(request.project_id, request, session).await
1760}
1761
1762async fn broadcast_project_message<T: EnvelopedMessage>(
1763 project_id: u64,
1764 request: T,
1765 session: Session,
1766) -> Result<()> {
1767 let project_id = ProjectId::from_proto(project_id);
1768 let project_connection_ids = session
1769 .db()
1770 .await
1771 .project_connection_ids(project_id, session.connection_id)
1772 .await?;
1773 broadcast(
1774 Some(session.connection_id),
1775 project_connection_ids.iter().copied(),
1776 |connection_id| {
1777 session
1778 .peer
1779 .forward_send(session.connection_id, connection_id, request.clone())
1780 },
1781 );
1782 Ok(())
1783}
1784
1785async fn follow(
1786 request: proto::Follow,
1787 response: Response<proto::Follow>,
1788 session: Session,
1789) -> Result<()> {
1790 let project_id = ProjectId::from_proto(request.project_id);
1791 let leader_id = request
1792 .leader_id
1793 .ok_or_else(|| anyhow!("invalid leader id"))?
1794 .into();
1795 let follower_id = session.connection_id;
1796
1797 {
1798 let project_connection_ids = session
1799 .db()
1800 .await
1801 .project_connection_ids(project_id, session.connection_id)
1802 .await?;
1803
1804 if !project_connection_ids.contains(&leader_id) {
1805 Err(anyhow!("no such peer"))?;
1806 }
1807 }
1808
1809 let mut response_payload = session
1810 .peer
1811 .forward_request(session.connection_id, leader_id, request)
1812 .await?;
1813 response_payload
1814 .views
1815 .retain(|view| view.leader_id != Some(follower_id.into()));
1816 response.send(response_payload)?;
1817
1818 let room = session
1819 .db()
1820 .await
1821 .follow(project_id, leader_id, follower_id)
1822 .await?;
1823 room_updated(&room, &session.peer);
1824
1825 Ok(())
1826}
1827
1828async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1829 let project_id = ProjectId::from_proto(request.project_id);
1830 let leader_id = request
1831 .leader_id
1832 .ok_or_else(|| anyhow!("invalid leader id"))?
1833 .into();
1834 let follower_id = session.connection_id;
1835
1836 if !session
1837 .db()
1838 .await
1839 .project_connection_ids(project_id, session.connection_id)
1840 .await?
1841 .contains(&leader_id)
1842 {
1843 Err(anyhow!("no such peer"))?;
1844 }
1845
1846 session
1847 .peer
1848 .forward_send(session.connection_id, leader_id, request)?;
1849
1850 let room = session
1851 .db()
1852 .await
1853 .unfollow(project_id, leader_id, follower_id)
1854 .await?;
1855 room_updated(&room, &session.peer);
1856
1857 Ok(())
1858}
1859
1860async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1861 let project_id = ProjectId::from_proto(request.project_id);
1862 let project_connection_ids = session
1863 .db
1864 .lock()
1865 .await
1866 .project_connection_ids(project_id, session.connection_id)
1867 .await?;
1868
1869 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1870 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1871 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1872 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1873 });
1874 for follower_peer_id in request.follower_ids.iter().copied() {
1875 let follower_connection_id = follower_peer_id.into();
1876 if project_connection_ids.contains(&follower_connection_id)
1877 && Some(follower_peer_id) != leader_id
1878 {
1879 session.peer.forward_send(
1880 session.connection_id,
1881 follower_connection_id,
1882 request.clone(),
1883 )?;
1884 }
1885 }
1886 Ok(())
1887}
1888
1889async fn get_users(
1890 request: proto::GetUsers,
1891 response: Response<proto::GetUsers>,
1892 session: Session,
1893) -> Result<()> {
1894 let user_ids = request
1895 .user_ids
1896 .into_iter()
1897 .map(UserId::from_proto)
1898 .collect();
1899 let users = session
1900 .db()
1901 .await
1902 .get_users_by_ids(user_ids)
1903 .await?
1904 .into_iter()
1905 .map(|user| proto::User {
1906 id: user.id.to_proto(),
1907 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1908 github_login: user.github_login,
1909 })
1910 .collect();
1911 response.send(proto::UsersResponse { users })?;
1912 Ok(())
1913}
1914
1915async fn fuzzy_search_users(
1916 request: proto::FuzzySearchUsers,
1917 response: Response<proto::FuzzySearchUsers>,
1918 session: Session,
1919) -> Result<()> {
1920 let query = request.query;
1921 let users = match query.len() {
1922 0 => vec![],
1923 1 | 2 => session
1924 .db()
1925 .await
1926 .get_user_by_github_login(&query)
1927 .await?
1928 .into_iter()
1929 .collect(),
1930 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1931 };
1932 let users = users
1933 .into_iter()
1934 .filter(|user| user.id != session.user_id)
1935 .map(|user| proto::User {
1936 id: user.id.to_proto(),
1937 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1938 github_login: user.github_login,
1939 })
1940 .collect();
1941 response.send(proto::UsersResponse { users })?;
1942 Ok(())
1943}
1944
1945async fn request_contact(
1946 request: proto::RequestContact,
1947 response: Response<proto::RequestContact>,
1948 session: Session,
1949) -> Result<()> {
1950 let requester_id = session.user_id;
1951 let responder_id = UserId::from_proto(request.responder_id);
1952 if requester_id == responder_id {
1953 return Err(anyhow!("cannot add yourself as a contact"))?;
1954 }
1955
1956 session
1957 .db()
1958 .await
1959 .send_contact_request(requester_id, responder_id)
1960 .await?;
1961
1962 // Update outgoing contact requests of requester
1963 let mut update = proto::UpdateContacts::default();
1964 update.outgoing_requests.push(responder_id.to_proto());
1965 for connection_id in session
1966 .connection_pool()
1967 .await
1968 .user_connection_ids(requester_id)
1969 {
1970 session.peer.send(connection_id, update.clone())?;
1971 }
1972
1973 // Update incoming contact requests of responder
1974 let mut update = proto::UpdateContacts::default();
1975 update
1976 .incoming_requests
1977 .push(proto::IncomingContactRequest {
1978 requester_id: requester_id.to_proto(),
1979 should_notify: true,
1980 });
1981 for connection_id in session
1982 .connection_pool()
1983 .await
1984 .user_connection_ids(responder_id)
1985 {
1986 session.peer.send(connection_id, update.clone())?;
1987 }
1988
1989 response.send(proto::Ack {})?;
1990 Ok(())
1991}
1992
1993async fn respond_to_contact_request(
1994 request: proto::RespondToContactRequest,
1995 response: Response<proto::RespondToContactRequest>,
1996 session: Session,
1997) -> Result<()> {
1998 let responder_id = session.user_id;
1999 let requester_id = UserId::from_proto(request.requester_id);
2000 let db = session.db().await;
2001 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2002 db.dismiss_contact_notification(responder_id, requester_id)
2003 .await?;
2004 } else {
2005 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2006
2007 db.respond_to_contact_request(responder_id, requester_id, accept)
2008 .await?;
2009 let requester_busy = db.is_user_busy(requester_id).await?;
2010 let responder_busy = db.is_user_busy(responder_id).await?;
2011
2012 let pool = session.connection_pool().await;
2013 // Update responder with new contact
2014 let mut update = proto::UpdateContacts::default();
2015 if accept {
2016 update
2017 .contacts
2018 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2019 }
2020 update
2021 .remove_incoming_requests
2022 .push(requester_id.to_proto());
2023 for connection_id in pool.user_connection_ids(responder_id) {
2024 session.peer.send(connection_id, update.clone())?;
2025 }
2026
2027 // Update requester with new contact
2028 let mut update = proto::UpdateContacts::default();
2029 if accept {
2030 update
2031 .contacts
2032 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2033 }
2034 update
2035 .remove_outgoing_requests
2036 .push(responder_id.to_proto());
2037 for connection_id in pool.user_connection_ids(requester_id) {
2038 session.peer.send(connection_id, update.clone())?;
2039 }
2040 }
2041
2042 response.send(proto::Ack {})?;
2043 Ok(())
2044}
2045
2046async fn remove_contact(
2047 request: proto::RemoveContact,
2048 response: Response<proto::RemoveContact>,
2049 session: Session,
2050) -> Result<()> {
2051 let requester_id = session.user_id;
2052 let responder_id = UserId::from_proto(request.user_id);
2053 let db = session.db().await;
2054 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2055
2056 let pool = session.connection_pool().await;
2057 // Update outgoing contact requests of requester
2058 let mut update = proto::UpdateContacts::default();
2059 if contact_accepted {
2060 update.remove_contacts.push(responder_id.to_proto());
2061 } else {
2062 update
2063 .remove_outgoing_requests
2064 .push(responder_id.to_proto());
2065 }
2066 for connection_id in pool.user_connection_ids(requester_id) {
2067 session.peer.send(connection_id, update.clone())?;
2068 }
2069
2070 // Update incoming contact requests of responder
2071 let mut update = proto::UpdateContacts::default();
2072 if contact_accepted {
2073 update.remove_contacts.push(requester_id.to_proto());
2074 } else {
2075 update
2076 .remove_incoming_requests
2077 .push(requester_id.to_proto());
2078 }
2079 for connection_id in pool.user_connection_ids(responder_id) {
2080 session.peer.send(connection_id, update.clone())?;
2081 }
2082
2083 response.send(proto::Ack {})?;
2084 Ok(())
2085}
2086
2087async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2088 let project_id = ProjectId::from_proto(request.project_id);
2089 let project_connection_ids = session
2090 .db()
2091 .await
2092 .project_connection_ids(project_id, session.connection_id)
2093 .await?;
2094 broadcast(
2095 Some(session.connection_id),
2096 project_connection_ids.iter().copied(),
2097 |connection_id| {
2098 session
2099 .peer
2100 .forward_send(session.connection_id, connection_id, request.clone())
2101 },
2102 );
2103 Ok(())
2104}
2105
2106async fn get_private_user_info(
2107 _request: proto::GetPrivateUserInfo,
2108 response: Response<proto::GetPrivateUserInfo>,
2109 session: Session,
2110) -> Result<()> {
2111 let metrics_id = session
2112 .db()
2113 .await
2114 .get_user_metrics_id(session.user_id)
2115 .await?;
2116 let user = session
2117 .db()
2118 .await
2119 .get_user_by_id(session.user_id)
2120 .await?
2121 .ok_or_else(|| anyhow!("user not found"))?;
2122 response.send(proto::GetPrivateUserInfoResponse {
2123 metrics_id,
2124 staff: user.admin,
2125 })?;
2126 Ok(())
2127}
2128
2129fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2130 match message {
2131 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2132 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2133 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2134 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2135 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2136 code: frame.code.into(),
2137 reason: frame.reason,
2138 })),
2139 }
2140}
2141
2142fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2143 match message {
2144 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2145 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2146 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2147 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2148 AxumMessage::Close(frame) => {
2149 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2150 code: frame.code.into(),
2151 reason: frame.reason,
2152 }))
2153 }
2154 }
2155}
2156
2157fn build_initial_contacts_update(
2158 contacts: Vec<db::Contact>,
2159 pool: &ConnectionPool,
2160) -> proto::UpdateContacts {
2161 let mut update = proto::UpdateContacts::default();
2162
2163 for contact in contacts {
2164 match contact {
2165 db::Contact::Accepted {
2166 user_id,
2167 should_notify,
2168 busy,
2169 } => {
2170 update
2171 .contacts
2172 .push(contact_for_user(user_id, should_notify, busy, &pool));
2173 }
2174 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2175 db::Contact::Incoming {
2176 user_id,
2177 should_notify,
2178 } => update
2179 .incoming_requests
2180 .push(proto::IncomingContactRequest {
2181 requester_id: user_id.to_proto(),
2182 should_notify,
2183 }),
2184 }
2185 }
2186
2187 update
2188}
2189
2190fn contact_for_user(
2191 user_id: UserId,
2192 should_notify: bool,
2193 busy: bool,
2194 pool: &ConnectionPool,
2195) -> proto::Contact {
2196 proto::Contact {
2197 user_id: user_id.to_proto(),
2198 online: pool.is_user_online(user_id),
2199 busy,
2200 should_notify,
2201 }
2202}
2203
2204fn room_updated(room: &proto::Room, peer: &Peer) {
2205 broadcast(
2206 None,
2207 room.participants
2208 .iter()
2209 .filter_map(|participant| Some(participant.peer_id?.into())),
2210 |peer_id| {
2211 peer.send(
2212 peer_id.into(),
2213 proto::RoomUpdated {
2214 room: Some(room.clone()),
2215 },
2216 )
2217 },
2218 );
2219}
2220
2221async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2222 let db = session.db().await;
2223 let contacts = db.get_contacts(user_id).await?;
2224 let busy = db.is_user_busy(user_id).await?;
2225
2226 let pool = session.connection_pool().await;
2227 let updated_contact = contact_for_user(user_id, false, busy, &pool);
2228 for contact in contacts {
2229 if let db::Contact::Accepted {
2230 user_id: contact_user_id,
2231 ..
2232 } = contact
2233 {
2234 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2235 session
2236 .peer
2237 .send(
2238 contact_conn_id,
2239 proto::UpdateContacts {
2240 contacts: vec![updated_contact.clone()],
2241 remove_contacts: Default::default(),
2242 incoming_requests: Default::default(),
2243 remove_incoming_requests: Default::default(),
2244 outgoing_requests: Default::default(),
2245 remove_outgoing_requests: Default::default(),
2246 },
2247 )
2248 .trace_err();
2249 }
2250 }
2251 }
2252 Ok(())
2253}
2254
2255async fn leave_room_for_session(session: &Session) -> Result<()> {
2256 let mut contacts_to_update = HashSet::default();
2257
2258 let room_id;
2259 let canceled_calls_to_user_ids;
2260 let live_kit_room;
2261 let delete_live_kit_room;
2262 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2263 contacts_to_update.insert(session.user_id);
2264
2265 for project in left_room.left_projects.values() {
2266 project_left(project, session);
2267 }
2268
2269 room_updated(&left_room.room, &session.peer);
2270 room_id = RoomId::from_proto(left_room.room.id);
2271 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2272 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2273 delete_live_kit_room = left_room.room.participants.is_empty();
2274 } else {
2275 return Ok(());
2276 }
2277
2278 {
2279 let pool = session.connection_pool().await;
2280 for canceled_user_id in canceled_calls_to_user_ids {
2281 for connection_id in pool.user_connection_ids(canceled_user_id) {
2282 session
2283 .peer
2284 .send(
2285 connection_id,
2286 proto::CallCanceled {
2287 room_id: room_id.to_proto(),
2288 },
2289 )
2290 .trace_err();
2291 }
2292 contacts_to_update.insert(canceled_user_id);
2293 }
2294 }
2295
2296 for contact_user_id in contacts_to_update {
2297 update_user_contacts(contact_user_id, &session).await?;
2298 }
2299
2300 if let Some(live_kit) = session.live_kit_client.as_ref() {
2301 live_kit
2302 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2303 .await
2304 .trace_err();
2305
2306 if delete_live_kit_room {
2307 live_kit.delete_room(live_kit_room).await.trace_err();
2308 }
2309 }
2310
2311 Ok(())
2312}
2313
2314fn project_left(project: &db::LeftProject, session: &Session) {
2315 for connection_id in &project.connection_ids {
2316 if project.host_user_id == session.user_id {
2317 session
2318 .peer
2319 .send(
2320 *connection_id,
2321 proto::UnshareProject {
2322 project_id: project.id.to_proto(),
2323 },
2324 )
2325 .trace_err();
2326 } else {
2327 session
2328 .peer
2329 .send(
2330 *connection_id,
2331 proto::RemoveProjectCollaborator {
2332 project_id: project.id.to_proto(),
2333 peer_id: Some(session.connection_id.into()),
2334 },
2335 )
2336 .trace_err();
2337 }
2338 }
2339}
2340
2341pub trait ResultExt {
2342 type Ok;
2343
2344 fn trace_err(self) -> Option<Self::Ok>;
2345}
2346
2347impl<T, E> ResultExt for Result<T, E>
2348where
2349 E: std::fmt::Debug,
2350{
2351 type Ok = T;
2352
2353 fn trace_err(self) -> Option<T> {
2354 match self {
2355 Ok(value) => Some(value),
2356 Err(error) => {
2357 tracing::error!("{:?}", error);
2358 None
2359 }
2360 }
2361 }
2362}