1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
7 UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66
67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
69
70const MESSAGE_COUNT_PER_PAGE: usize = 100;
71const MAX_MESSAGE_LEN: usize = 1024;
72
73lazy_static! {
74 static ref METRIC_CONNECTIONS: IntGauge =
75 register_int_gauge!("connections", "number of connections").unwrap();
76 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
77 "shared_projects",
78 "number of open projects with one or more guests"
79 )
80 .unwrap();
81}
82
83type MessageHandler =
84 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
85
86struct Response<R> {
87 peer: Arc<Peer>,
88 receipt: Receipt<R>,
89 responded: Arc<AtomicBool>,
90}
91
92impl<R: RequestMessage> Response<R> {
93 fn send(self, payload: R::Response) -> Result<()> {
94 self.responded.store(true, SeqCst);
95 self.peer.respond(self.receipt, payload)?;
96 Ok(())
97 }
98}
99
100#[derive(Clone)]
101struct Session {
102 user_id: UserId,
103 connection_id: ConnectionId,
104 db: Arc<tokio::sync::Mutex<DbHandle>>,
105 peer: Arc<Peer>,
106 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
107 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
108 executor: Executor,
109}
110
111impl Session {
112 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 let guard = self.db.lock().await;
116 #[cfg(test)]
117 tokio::task::yield_now().await;
118 guard
119 }
120
121 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
122 #[cfg(test)]
123 tokio::task::yield_now().await;
124 let guard = self.connection_pool.lock();
125 ConnectionPoolGuard {
126 guard,
127 _not_send: PhantomData,
128 }
129 }
130}
131
132impl fmt::Debug for Session {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("Session")
135 .field("user_id", &self.user_id)
136 .field("connection_id", &self.connection_id)
137 .finish()
138 }
139}
140
141struct DbHandle(Arc<Database>);
142
143impl Deref for DbHandle {
144 type Target = Database;
145
146 fn deref(&self) -> &Self::Target {
147 self.0.as_ref()
148 }
149}
150
151pub struct Server {
152 id: parking_lot::Mutex<ServerId>,
153 peer: Arc<Peer>,
154 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
155 app_state: Arc<AppState>,
156 executor: Executor,
157 handlers: HashMap<TypeId, MessageHandler>,
158 teardown: watch::Sender<()>,
159}
160
161pub(crate) struct ConnectionPoolGuard<'a> {
162 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
163 _not_send: PhantomData<Rc<()>>,
164}
165
166#[derive(Serialize)]
167pub struct ServerSnapshot<'a> {
168 peer: &'a Peer,
169 #[serde(serialize_with = "serialize_deref")]
170 connection_pool: ConnectionPoolGuard<'a>,
171}
172
173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
174where
175 S: Serializer,
176 T: Deref<Target = U>,
177 U: Serialize,
178{
179 Serialize::serialize(value.deref(), serializer)
180}
181
182impl Server {
183 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
184 let mut server = Self {
185 id: parking_lot::Mutex::new(id),
186 peer: Peer::new(id.0 as u32),
187 app_state,
188 executor,
189 connection_pool: Default::default(),
190 handlers: Default::default(),
191 teardown: watch::channel(()).0,
192 };
193
194 server
195 .add_request_handler(ping)
196 .add_request_handler(create_room)
197 .add_request_handler(join_room)
198 .add_request_handler(rejoin_room)
199 .add_request_handler(leave_room)
200 .add_request_handler(call)
201 .add_request_handler(cancel_call)
202 .add_message_handler(decline_call)
203 .add_request_handler(update_participant_location)
204 .add_request_handler(share_project)
205 .add_message_handler(unshare_project)
206 .add_request_handler(join_project)
207 .add_message_handler(leave_project)
208 .add_request_handler(update_project)
209 .add_request_handler(update_worktree)
210 .add_message_handler(start_language_server)
211 .add_message_handler(update_language_server)
212 .add_message_handler(update_diagnostic_summary)
213 .add_message_handler(update_worktree_settings)
214 .add_message_handler(refresh_inlay_hints)
215 .add_request_handler(forward_project_request::<proto::GetHover>)
216 .add_request_handler(forward_project_request::<proto::GetDefinition>)
217 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetReferences>)
219 .add_request_handler(forward_project_request::<proto::SearchProject>)
220 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
221 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
222 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
225 .add_request_handler(forward_project_request::<proto::GetCompletions>)
226 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
227 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
229 .add_request_handler(forward_project_request::<proto::PrepareRename>)
230 .add_request_handler(forward_project_request::<proto::PerformRename>)
231 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
232 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
233 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
234 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
235 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
240 .add_request_handler(forward_project_request::<proto::InlayHints>)
241 .add_message_handler(create_buffer_for_peer)
242 .add_request_handler(update_buffer)
243 .add_message_handler(update_buffer_file)
244 .add_message_handler(buffer_reloaded)
245 .add_message_handler(buffer_saved)
246 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
247 .add_request_handler(get_users)
248 .add_request_handler(fuzzy_search_users)
249 .add_request_handler(request_contact)
250 .add_request_handler(remove_contact)
251 .add_request_handler(respond_to_contact_request)
252 .add_request_handler(create_channel)
253 .add_request_handler(remove_channel)
254 .add_request_handler(invite_channel_member)
255 .add_request_handler(remove_channel_member)
256 .add_request_handler(set_channel_member_admin)
257 .add_request_handler(rename_channel)
258 .add_request_handler(join_channel_buffer)
259 .add_request_handler(leave_channel_buffer)
260 .add_message_handler(update_channel_buffer)
261 .add_request_handler(rejoin_channel_buffers)
262 .add_request_handler(get_channel_members)
263 .add_request_handler(respond_to_channel_invite)
264 .add_request_handler(join_channel)
265 .add_request_handler(join_channel_chat)
266 .add_message_handler(leave_channel_chat)
267 .add_request_handler(send_channel_message)
268 .add_request_handler(get_channel_messages)
269 .add_request_handler(follow)
270 .add_message_handler(unfollow)
271 .add_message_handler(update_followers)
272 .add_message_handler(update_diff_base)
273 .add_request_handler(get_private_user_info);
274
275 Arc::new(server)
276 }
277
278 pub async fn start(&self) -> Result<()> {
279 let server_id = *self.id.lock();
280 let app_state = self.app_state.clone();
281 let peer = self.peer.clone();
282 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
283 let pool = self.connection_pool.clone();
284 let live_kit_client = self.app_state.live_kit_client.clone();
285
286 let span = info_span!("start server");
287 self.executor.spawn_detached(
288 async move {
289 tracing::info!("waiting for cleanup timeout");
290 timeout.await;
291 tracing::info!("cleanup timeout expired, retrieving stale rooms");
292 if let Some((room_ids, channel_ids)) = app_state
293 .db
294 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
295 .await
296 .trace_err()
297 {
298 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
299 tracing::info!(
300 stale_channel_buffer_count = channel_ids.len(),
301 "retrieved stale channel buffers"
302 );
303
304 for channel_id in channel_ids {
305 if let Some(refreshed_channel_buffer) = app_state
306 .db
307 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
308 .await
309 .trace_err()
310 {
311 for connection_id in refreshed_channel_buffer.connection_ids {
312 for message in &refreshed_channel_buffer.removed_collaborators {
313 peer.send(connection_id, message.clone()).trace_err();
314 }
315 }
316 }
317 }
318
319 for room_id in room_ids {
320 let mut contacts_to_update = HashSet::default();
321 let mut canceled_calls_to_user_ids = Vec::new();
322 let mut live_kit_room = String::new();
323 let mut delete_live_kit_room = false;
324
325 if let Some(mut refreshed_room) = app_state
326 .db
327 .clear_stale_room_participants(room_id, server_id)
328 .await
329 .trace_err()
330 {
331 tracing::info!(
332 room_id = room_id.0,
333 new_participant_count = refreshed_room.room.participants.len(),
334 "refreshed room"
335 );
336 room_updated(&refreshed_room.room, &peer);
337 if let Some(channel_id) = refreshed_room.channel_id {
338 channel_updated(
339 channel_id,
340 &refreshed_room.room,
341 &refreshed_room.channel_members,
342 &peer,
343 &*pool.lock(),
344 );
345 }
346 contacts_to_update
347 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
348 contacts_to_update
349 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
350 canceled_calls_to_user_ids =
351 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
352 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
353 delete_live_kit_room = refreshed_room.room.participants.is_empty();
354 }
355
356 {
357 let pool = pool.lock();
358 for canceled_user_id in canceled_calls_to_user_ids {
359 for connection_id in pool.user_connection_ids(canceled_user_id) {
360 peer.send(
361 connection_id,
362 proto::CallCanceled {
363 room_id: room_id.to_proto(),
364 },
365 )
366 .trace_err();
367 }
368 }
369 }
370
371 for user_id in contacts_to_update {
372 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
373 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
374 if let Some((busy, contacts)) = busy.zip(contacts) {
375 let pool = pool.lock();
376 let updated_contact = contact_for_user(user_id, false, busy, &pool);
377 for contact in contacts {
378 if let db::Contact::Accepted {
379 user_id: contact_user_id,
380 ..
381 } = contact
382 {
383 for contact_conn_id in
384 pool.user_connection_ids(contact_user_id)
385 {
386 peer.send(
387 contact_conn_id,
388 proto::UpdateContacts {
389 contacts: vec![updated_contact.clone()],
390 remove_contacts: Default::default(),
391 incoming_requests: Default::default(),
392 remove_incoming_requests: Default::default(),
393 outgoing_requests: Default::default(),
394 remove_outgoing_requests: Default::default(),
395 },
396 )
397 .trace_err();
398 }
399 }
400 }
401 }
402 }
403
404 if let Some(live_kit) = live_kit_client.as_ref() {
405 if delete_live_kit_room {
406 live_kit.delete_room(live_kit_room).await.trace_err();
407 }
408 }
409 }
410 }
411
412 app_state
413 .db
414 .delete_stale_servers(&app_state.config.zed_environment, server_id)
415 .await
416 .trace_err();
417 }
418 .instrument(span),
419 );
420 Ok(())
421 }
422
423 pub fn teardown(&self) {
424 self.peer.teardown();
425 self.connection_pool.lock().reset();
426 let _ = self.teardown.send(());
427 }
428
429 #[cfg(test)]
430 pub fn reset(&self, id: ServerId) {
431 self.teardown();
432 *self.id.lock() = id;
433 self.peer.reset(id.0 as u32);
434 }
435
436 #[cfg(test)]
437 pub fn id(&self) -> ServerId {
438 *self.id.lock()
439 }
440
441 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
442 where
443 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
444 Fut: 'static + Send + Future<Output = Result<()>>,
445 M: EnvelopedMessage,
446 {
447 let prev_handler = self.handlers.insert(
448 TypeId::of::<M>(),
449 Box::new(move |envelope, session| {
450 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
451 let span = info_span!(
452 "handle message",
453 payload_type = envelope.payload_type_name()
454 );
455 span.in_scope(|| {
456 tracing::info!(
457 payload_type = envelope.payload_type_name(),
458 "message received"
459 );
460 });
461 let start_time = Instant::now();
462 let future = (handler)(*envelope, session);
463 async move {
464 let result = future.await;
465 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
466 match result {
467 Err(error) => {
468 tracing::error!(%error, ?duration_ms, "error handling message")
469 }
470 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
471 }
472 }
473 .instrument(span)
474 .boxed()
475 }),
476 );
477 if prev_handler.is_some() {
478 panic!("registered a handler for the same message twice");
479 }
480 self
481 }
482
483 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
484 where
485 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
486 Fut: 'static + Send + Future<Output = Result<()>>,
487 M: EnvelopedMessage,
488 {
489 self.add_handler(move |envelope, session| handler(envelope.payload, session));
490 self
491 }
492
493 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
494 where
495 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
496 Fut: Send + Future<Output = Result<()>>,
497 M: RequestMessage,
498 {
499 let handler = Arc::new(handler);
500 self.add_handler(move |envelope, session| {
501 let receipt = envelope.receipt();
502 let handler = handler.clone();
503 async move {
504 let peer = session.peer.clone();
505 let responded = Arc::new(AtomicBool::default());
506 let response = Response {
507 peer: peer.clone(),
508 responded: responded.clone(),
509 receipt,
510 };
511 match (handler)(envelope.payload, response, session).await {
512 Ok(()) => {
513 if responded.load(std::sync::atomic::Ordering::SeqCst) {
514 Ok(())
515 } else {
516 Err(anyhow!("handler did not send a response"))?
517 }
518 }
519 Err(error) => {
520 peer.respond_with_error(
521 receipt,
522 proto::Error {
523 message: error.to_string(),
524 },
525 )?;
526 Err(error)
527 }
528 }
529 }
530 })
531 }
532
533 pub fn handle_connection(
534 self: &Arc<Self>,
535 connection: Connection,
536 address: String,
537 user: User,
538 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
539 executor: Executor,
540 ) -> impl Future<Output = Result<()>> {
541 let this = self.clone();
542 let user_id = user.id;
543 let login = user.github_login;
544 let span = info_span!("handle connection", %user_id, %login, %address);
545 let mut teardown = self.teardown.subscribe();
546 async move {
547 let (connection_id, handle_io, mut incoming_rx) = this
548 .peer
549 .add_connection(connection, {
550 let executor = executor.clone();
551 move |duration| executor.sleep(duration)
552 });
553
554 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
555 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
556 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
557
558 if let Some(send_connection_id) = send_connection_id.take() {
559 let _ = send_connection_id.send(connection_id);
560 }
561
562 if !user.connected_once {
563 this.peer.send(connection_id, proto::ShowContacts {})?;
564 this.app_state.db.set_user_connected_once(user_id, true).await?;
565 }
566
567 let (contacts, channels_for_user, channel_invites) = future::try_join3(
568 this.app_state.db.get_contacts(user_id),
569 this.app_state.db.get_channels_for_user(user_id),
570 this.app_state.db.get_channel_invites_for_user(user_id)
571 ).await?;
572
573 {
574 let mut pool = this.connection_pool.lock();
575 pool.add_connection(connection_id, user_id, user.admin);
576 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
577 this.peer.send(connection_id, build_initial_channels_update(
578 channels_for_user,
579 channel_invites
580 ))?;
581 }
582
583 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
584 this.peer.send(connection_id, incoming_call)?;
585 }
586
587 let session = Session {
588 user_id,
589 connection_id,
590 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
591 peer: this.peer.clone(),
592 connection_pool: this.connection_pool.clone(),
593 live_kit_client: this.app_state.live_kit_client.clone(),
594 executor: executor.clone(),
595 };
596 update_user_contacts(user_id, &session).await?;
597
598 let handle_io = handle_io.fuse();
599 futures::pin_mut!(handle_io);
600
601 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
602 // This prevents deadlocks when e.g., client A performs a request to client B and
603 // client B performs a request to client A. If both clients stop processing further
604 // messages until their respective request completes, they won't have a chance to
605 // respond to the other client's request and cause a deadlock.
606 //
607 // This arrangement ensures we will attempt to process earlier messages first, but fall
608 // back to processing messages arrived later in the spirit of making progress.
609 let mut foreground_message_handlers = FuturesUnordered::new();
610 let concurrent_handlers = Arc::new(Semaphore::new(256));
611 loop {
612 let next_message = async {
613 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
614 let message = incoming_rx.next().await;
615 (permit, message)
616 }.fuse();
617 futures::pin_mut!(next_message);
618 futures::select_biased! {
619 _ = teardown.changed().fuse() => return Ok(()),
620 result = handle_io => {
621 if let Err(error) = result {
622 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
623 }
624 break;
625 }
626 _ = foreground_message_handlers.next() => {}
627 next_message = next_message => {
628 let (permit, message) = next_message;
629 if let Some(message) = message {
630 let type_name = message.payload_type_name();
631 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
632 let span_enter = span.enter();
633 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
634 let is_background = message.is_background();
635 let handle_message = (handler)(message, session.clone());
636 drop(span_enter);
637
638 let handle_message = async move {
639 handle_message.await;
640 drop(permit);
641 }.instrument(span);
642 if is_background {
643 executor.spawn_detached(handle_message);
644 } else {
645 foreground_message_handlers.push(handle_message);
646 }
647 } else {
648 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
649 }
650 } else {
651 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
652 break;
653 }
654 }
655 }
656 }
657
658 drop(foreground_message_handlers);
659 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
660 if let Err(error) = connection_lost(session, teardown, executor).await {
661 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
662 }
663
664 Ok(())
665 }.instrument(span)
666 }
667
668 pub async fn invite_code_redeemed(
669 self: &Arc<Self>,
670 inviter_id: UserId,
671 invitee_id: UserId,
672 ) -> Result<()> {
673 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
674 if let Some(code) = &user.invite_code {
675 let pool = self.connection_pool.lock();
676 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
677 for connection_id in pool.user_connection_ids(inviter_id) {
678 self.peer.send(
679 connection_id,
680 proto::UpdateContacts {
681 contacts: vec![invitee_contact.clone()],
682 ..Default::default()
683 },
684 )?;
685 self.peer.send(
686 connection_id,
687 proto::UpdateInviteInfo {
688 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
689 count: user.invite_count as u32,
690 },
691 )?;
692 }
693 }
694 }
695 Ok(())
696 }
697
698 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
699 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
700 if let Some(invite_code) = &user.invite_code {
701 let pool = self.connection_pool.lock();
702 for connection_id in pool.user_connection_ids(user_id) {
703 self.peer.send(
704 connection_id,
705 proto::UpdateInviteInfo {
706 url: format!(
707 "{}{}",
708 self.app_state.config.invite_link_prefix, invite_code
709 ),
710 count: user.invite_count as u32,
711 },
712 )?;
713 }
714 }
715 }
716 Ok(())
717 }
718
719 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
720 ServerSnapshot {
721 connection_pool: ConnectionPoolGuard {
722 guard: self.connection_pool.lock(),
723 _not_send: PhantomData,
724 },
725 peer: &self.peer,
726 }
727 }
728}
729
730impl<'a> Deref for ConnectionPoolGuard<'a> {
731 type Target = ConnectionPool;
732
733 fn deref(&self) -> &Self::Target {
734 &*self.guard
735 }
736}
737
738impl<'a> DerefMut for ConnectionPoolGuard<'a> {
739 fn deref_mut(&mut self) -> &mut Self::Target {
740 &mut *self.guard
741 }
742}
743
744impl<'a> Drop for ConnectionPoolGuard<'a> {
745 fn drop(&mut self) {
746 #[cfg(test)]
747 self.check_invariants();
748 }
749}
750
751fn broadcast<F>(
752 sender_id: Option<ConnectionId>,
753 receiver_ids: impl IntoIterator<Item = ConnectionId>,
754 mut f: F,
755) where
756 F: FnMut(ConnectionId) -> anyhow::Result<()>,
757{
758 for receiver_id in receiver_ids {
759 if Some(receiver_id) != sender_id {
760 if let Err(error) = f(receiver_id) {
761 tracing::error!("failed to send to {:?} {}", receiver_id, error);
762 }
763 }
764 }
765}
766
767lazy_static! {
768 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
769}
770
771pub struct ProtocolVersion(u32);
772
773impl Header for ProtocolVersion {
774 fn name() -> &'static HeaderName {
775 &ZED_PROTOCOL_VERSION
776 }
777
778 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
779 where
780 Self: Sized,
781 I: Iterator<Item = &'i axum::http::HeaderValue>,
782 {
783 let version = values
784 .next()
785 .ok_or_else(axum::headers::Error::invalid)?
786 .to_str()
787 .map_err(|_| axum::headers::Error::invalid())?
788 .parse()
789 .map_err(|_| axum::headers::Error::invalid())?;
790 Ok(Self(version))
791 }
792
793 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
794 values.extend([self.0.to_string().parse().unwrap()]);
795 }
796}
797
798pub fn routes(server: Arc<Server>) -> Router<Body> {
799 Router::new()
800 .route("/rpc", get(handle_websocket_request))
801 .layer(
802 ServiceBuilder::new()
803 .layer(Extension(server.app_state.clone()))
804 .layer(middleware::from_fn(auth::validate_header)),
805 )
806 .route("/metrics", get(handle_metrics))
807 .layer(Extension(server))
808}
809
810pub async fn handle_websocket_request(
811 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
812 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
813 Extension(server): Extension<Arc<Server>>,
814 Extension(user): Extension<User>,
815 ws: WebSocketUpgrade,
816) -> axum::response::Response {
817 if protocol_version != rpc::PROTOCOL_VERSION {
818 return (
819 StatusCode::UPGRADE_REQUIRED,
820 "client must be upgraded".to_string(),
821 )
822 .into_response();
823 }
824 let socket_address = socket_address.to_string();
825 ws.on_upgrade(move |socket| {
826 use util::ResultExt;
827 let socket = socket
828 .map_ok(to_tungstenite_message)
829 .err_into()
830 .with(|message| async move { Ok(to_axum_message(message)) });
831 let connection = Connection::new(Box::pin(socket));
832 async move {
833 server
834 .handle_connection(connection, socket_address, user, None, Executor::Production)
835 .await
836 .log_err();
837 }
838 })
839}
840
841pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
842 let connections = server
843 .connection_pool
844 .lock()
845 .connections()
846 .filter(|connection| !connection.admin)
847 .count();
848
849 METRIC_CONNECTIONS.set(connections as _);
850
851 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
852 METRIC_SHARED_PROJECTS.set(shared_projects as _);
853
854 let encoder = prometheus::TextEncoder::new();
855 let metric_families = prometheus::gather();
856 let encoded_metrics = encoder
857 .encode_to_string(&metric_families)
858 .map_err(|err| anyhow!("{}", err))?;
859 Ok(encoded_metrics)
860}
861
862#[instrument(err, skip(executor))]
863async fn connection_lost(
864 session: Session,
865 mut teardown: watch::Receiver<()>,
866 executor: Executor,
867) -> Result<()> {
868 session.peer.disconnect(session.connection_id);
869 session
870 .connection_pool()
871 .await
872 .remove_connection(session.connection_id)?;
873
874 session
875 .db()
876 .await
877 .connection_lost(session.connection_id)
878 .await
879 .trace_err();
880
881 futures::select_biased! {
882 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
883 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
884 leave_room_for_session(&session).await.trace_err();
885 leave_channel_buffers_for_session(&session)
886 .await
887 .trace_err();
888
889 if !session
890 .connection_pool()
891 .await
892 .is_user_online(session.user_id)
893 {
894 let db = session.db().await;
895 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
896 room_updated(&room, &session.peer);
897 }
898 }
899
900 update_user_contacts(session.user_id, &session).await?;
901 }
902 _ = teardown.changed().fuse() => {}
903 }
904
905 Ok(())
906}
907
908async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
909 response.send(proto::Ack {})?;
910 Ok(())
911}
912
913async fn create_room(
914 _request: proto::CreateRoom,
915 response: Response<proto::CreateRoom>,
916 session: Session,
917) -> Result<()> {
918 let live_kit_room = nanoid::nanoid!(30);
919
920 let live_kit_connection_info = {
921 let live_kit_room = live_kit_room.clone();
922 let live_kit = session.live_kit_client.as_ref();
923
924 util::async_iife!({
925 let live_kit = live_kit?;
926
927 live_kit
928 .create_room(live_kit_room.clone())
929 .await
930 .trace_err()?;
931
932 let token = live_kit
933 .room_token(&live_kit_room, &session.user_id.to_string())
934 .trace_err()?;
935
936 Some(proto::LiveKitConnectionInfo {
937 server_url: live_kit.url().into(),
938 token,
939 })
940 })
941 }
942 .await;
943
944 let room = session
945 .db()
946 .await
947 .create_room(session.user_id, session.connection_id, &live_kit_room)
948 .await?;
949
950 response.send(proto::CreateRoomResponse {
951 room: Some(room.clone()),
952 live_kit_connection_info,
953 })?;
954
955 update_user_contacts(session.user_id, &session).await?;
956 Ok(())
957}
958
959async fn join_room(
960 request: proto::JoinRoom,
961 response: Response<proto::JoinRoom>,
962 session: Session,
963) -> Result<()> {
964 let room_id = RoomId::from_proto(request.id);
965 let joined_room = {
966 let room = session
967 .db()
968 .await
969 .join_room(room_id, session.user_id, session.connection_id)
970 .await?;
971 room_updated(&room.room, &session.peer);
972 room.into_inner()
973 };
974
975 if let Some(channel_id) = joined_room.channel_id {
976 channel_updated(
977 channel_id,
978 &joined_room.room,
979 &joined_room.channel_members,
980 &session.peer,
981 &*session.connection_pool().await,
982 )
983 }
984
985 for connection_id in session
986 .connection_pool()
987 .await
988 .user_connection_ids(session.user_id)
989 {
990 session
991 .peer
992 .send(
993 connection_id,
994 proto::CallCanceled {
995 room_id: room_id.to_proto(),
996 },
997 )
998 .trace_err();
999 }
1000
1001 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1002 if let Some(token) = live_kit
1003 .room_token(
1004 &joined_room.room.live_kit_room,
1005 &session.user_id.to_string(),
1006 )
1007 .trace_err()
1008 {
1009 Some(proto::LiveKitConnectionInfo {
1010 server_url: live_kit.url().into(),
1011 token,
1012 })
1013 } else {
1014 None
1015 }
1016 } else {
1017 None
1018 };
1019
1020 response.send(proto::JoinRoomResponse {
1021 room: Some(joined_room.room),
1022 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1023 live_kit_connection_info,
1024 })?;
1025
1026 update_user_contacts(session.user_id, &session).await?;
1027 Ok(())
1028}
1029
1030async fn rejoin_room(
1031 request: proto::RejoinRoom,
1032 response: Response<proto::RejoinRoom>,
1033 session: Session,
1034) -> Result<()> {
1035 let room;
1036 let channel_id;
1037 let channel_members;
1038 {
1039 let mut rejoined_room = session
1040 .db()
1041 .await
1042 .rejoin_room(request, session.user_id, session.connection_id)
1043 .await?;
1044
1045 response.send(proto::RejoinRoomResponse {
1046 room: Some(rejoined_room.room.clone()),
1047 reshared_projects: rejoined_room
1048 .reshared_projects
1049 .iter()
1050 .map(|project| proto::ResharedProject {
1051 id: project.id.to_proto(),
1052 collaborators: project
1053 .collaborators
1054 .iter()
1055 .map(|collaborator| collaborator.to_proto())
1056 .collect(),
1057 })
1058 .collect(),
1059 rejoined_projects: rejoined_room
1060 .rejoined_projects
1061 .iter()
1062 .map(|rejoined_project| proto::RejoinedProject {
1063 id: rejoined_project.id.to_proto(),
1064 worktrees: rejoined_project
1065 .worktrees
1066 .iter()
1067 .map(|worktree| proto::WorktreeMetadata {
1068 id: worktree.id,
1069 root_name: worktree.root_name.clone(),
1070 visible: worktree.visible,
1071 abs_path: worktree.abs_path.clone(),
1072 })
1073 .collect(),
1074 collaborators: rejoined_project
1075 .collaborators
1076 .iter()
1077 .map(|collaborator| collaborator.to_proto())
1078 .collect(),
1079 language_servers: rejoined_project.language_servers.clone(),
1080 })
1081 .collect(),
1082 })?;
1083 room_updated(&rejoined_room.room, &session.peer);
1084
1085 for project in &rejoined_room.reshared_projects {
1086 for collaborator in &project.collaborators {
1087 session
1088 .peer
1089 .send(
1090 collaborator.connection_id,
1091 proto::UpdateProjectCollaborator {
1092 project_id: project.id.to_proto(),
1093 old_peer_id: Some(project.old_connection_id.into()),
1094 new_peer_id: Some(session.connection_id.into()),
1095 },
1096 )
1097 .trace_err();
1098 }
1099
1100 broadcast(
1101 Some(session.connection_id),
1102 project
1103 .collaborators
1104 .iter()
1105 .map(|collaborator| collaborator.connection_id),
1106 |connection_id| {
1107 session.peer.forward_send(
1108 session.connection_id,
1109 connection_id,
1110 proto::UpdateProject {
1111 project_id: project.id.to_proto(),
1112 worktrees: project.worktrees.clone(),
1113 },
1114 )
1115 },
1116 );
1117 }
1118
1119 for project in &rejoined_room.rejoined_projects {
1120 for collaborator in &project.collaborators {
1121 session
1122 .peer
1123 .send(
1124 collaborator.connection_id,
1125 proto::UpdateProjectCollaborator {
1126 project_id: project.id.to_proto(),
1127 old_peer_id: Some(project.old_connection_id.into()),
1128 new_peer_id: Some(session.connection_id.into()),
1129 },
1130 )
1131 .trace_err();
1132 }
1133 }
1134
1135 for project in &mut rejoined_room.rejoined_projects {
1136 for worktree in mem::take(&mut project.worktrees) {
1137 #[cfg(any(test, feature = "test-support"))]
1138 const MAX_CHUNK_SIZE: usize = 2;
1139 #[cfg(not(any(test, feature = "test-support")))]
1140 const MAX_CHUNK_SIZE: usize = 256;
1141
1142 // Stream this worktree's entries.
1143 let message = proto::UpdateWorktree {
1144 project_id: project.id.to_proto(),
1145 worktree_id: worktree.id,
1146 abs_path: worktree.abs_path.clone(),
1147 root_name: worktree.root_name,
1148 updated_entries: worktree.updated_entries,
1149 removed_entries: worktree.removed_entries,
1150 scan_id: worktree.scan_id,
1151 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1152 updated_repositories: worktree.updated_repositories,
1153 removed_repositories: worktree.removed_repositories,
1154 };
1155 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1156 session.peer.send(session.connection_id, update.clone())?;
1157 }
1158
1159 // Stream this worktree's diagnostics.
1160 for summary in worktree.diagnostic_summaries {
1161 session.peer.send(
1162 session.connection_id,
1163 proto::UpdateDiagnosticSummary {
1164 project_id: project.id.to_proto(),
1165 worktree_id: worktree.id,
1166 summary: Some(summary),
1167 },
1168 )?;
1169 }
1170
1171 for settings_file in worktree.settings_files {
1172 session.peer.send(
1173 session.connection_id,
1174 proto::UpdateWorktreeSettings {
1175 project_id: project.id.to_proto(),
1176 worktree_id: worktree.id,
1177 path: settings_file.path,
1178 content: Some(settings_file.content),
1179 },
1180 )?;
1181 }
1182 }
1183
1184 for language_server in &project.language_servers {
1185 session.peer.send(
1186 session.connection_id,
1187 proto::UpdateLanguageServer {
1188 project_id: project.id.to_proto(),
1189 language_server_id: language_server.id,
1190 variant: Some(
1191 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1192 proto::LspDiskBasedDiagnosticsUpdated {},
1193 ),
1194 ),
1195 },
1196 )?;
1197 }
1198 }
1199
1200 let rejoined_room = rejoined_room.into_inner();
1201
1202 room = rejoined_room.room;
1203 channel_id = rejoined_room.channel_id;
1204 channel_members = rejoined_room.channel_members;
1205 }
1206
1207 if let Some(channel_id) = channel_id {
1208 channel_updated(
1209 channel_id,
1210 &room,
1211 &channel_members,
1212 &session.peer,
1213 &*session.connection_pool().await,
1214 );
1215 }
1216
1217 update_user_contacts(session.user_id, &session).await?;
1218 Ok(())
1219}
1220
1221async fn leave_room(
1222 _: proto::LeaveRoom,
1223 response: Response<proto::LeaveRoom>,
1224 session: Session,
1225) -> Result<()> {
1226 leave_room_for_session(&session).await?;
1227 response.send(proto::Ack {})?;
1228 Ok(())
1229}
1230
1231async fn call(
1232 request: proto::Call,
1233 response: Response<proto::Call>,
1234 session: Session,
1235) -> Result<()> {
1236 let room_id = RoomId::from_proto(request.room_id);
1237 let calling_user_id = session.user_id;
1238 let calling_connection_id = session.connection_id;
1239 let called_user_id = UserId::from_proto(request.called_user_id);
1240 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1241 if !session
1242 .db()
1243 .await
1244 .has_contact(calling_user_id, called_user_id)
1245 .await?
1246 {
1247 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1248 }
1249
1250 let incoming_call = {
1251 let (room, incoming_call) = &mut *session
1252 .db()
1253 .await
1254 .call(
1255 room_id,
1256 calling_user_id,
1257 calling_connection_id,
1258 called_user_id,
1259 initial_project_id,
1260 )
1261 .await?;
1262 room_updated(&room, &session.peer);
1263 mem::take(incoming_call)
1264 };
1265 update_user_contacts(called_user_id, &session).await?;
1266
1267 let mut calls = session
1268 .connection_pool()
1269 .await
1270 .user_connection_ids(called_user_id)
1271 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1272 .collect::<FuturesUnordered<_>>();
1273
1274 while let Some(call_response) = calls.next().await {
1275 match call_response.as_ref() {
1276 Ok(_) => {
1277 response.send(proto::Ack {})?;
1278 return Ok(());
1279 }
1280 Err(_) => {
1281 call_response.trace_err();
1282 }
1283 }
1284 }
1285
1286 {
1287 let room = session
1288 .db()
1289 .await
1290 .call_failed(room_id, called_user_id)
1291 .await?;
1292 room_updated(&room, &session.peer);
1293 }
1294 update_user_contacts(called_user_id, &session).await?;
1295
1296 Err(anyhow!("failed to ring user"))?
1297}
1298
1299async fn cancel_call(
1300 request: proto::CancelCall,
1301 response: Response<proto::CancelCall>,
1302 session: Session,
1303) -> Result<()> {
1304 let called_user_id = UserId::from_proto(request.called_user_id);
1305 let room_id = RoomId::from_proto(request.room_id);
1306 {
1307 let room = session
1308 .db()
1309 .await
1310 .cancel_call(room_id, session.connection_id, called_user_id)
1311 .await?;
1312 room_updated(&room, &session.peer);
1313 }
1314
1315 for connection_id in session
1316 .connection_pool()
1317 .await
1318 .user_connection_ids(called_user_id)
1319 {
1320 session
1321 .peer
1322 .send(
1323 connection_id,
1324 proto::CallCanceled {
1325 room_id: room_id.to_proto(),
1326 },
1327 )
1328 .trace_err();
1329 }
1330 response.send(proto::Ack {})?;
1331
1332 update_user_contacts(called_user_id, &session).await?;
1333 Ok(())
1334}
1335
1336async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1337 let room_id = RoomId::from_proto(message.room_id);
1338 {
1339 let room = session
1340 .db()
1341 .await
1342 .decline_call(Some(room_id), session.user_id)
1343 .await?
1344 .ok_or_else(|| anyhow!("failed to decline call"))?;
1345 room_updated(&room, &session.peer);
1346 }
1347
1348 for connection_id in session
1349 .connection_pool()
1350 .await
1351 .user_connection_ids(session.user_id)
1352 {
1353 session
1354 .peer
1355 .send(
1356 connection_id,
1357 proto::CallCanceled {
1358 room_id: room_id.to_proto(),
1359 },
1360 )
1361 .trace_err();
1362 }
1363 update_user_contacts(session.user_id, &session).await?;
1364 Ok(())
1365}
1366
1367async fn update_participant_location(
1368 request: proto::UpdateParticipantLocation,
1369 response: Response<proto::UpdateParticipantLocation>,
1370 session: Session,
1371) -> Result<()> {
1372 let room_id = RoomId::from_proto(request.room_id);
1373 let location = request
1374 .location
1375 .ok_or_else(|| anyhow!("invalid location"))?;
1376
1377 let db = session.db().await;
1378 let room = db
1379 .update_room_participant_location(room_id, session.connection_id, location)
1380 .await?;
1381
1382 room_updated(&room, &session.peer);
1383 response.send(proto::Ack {})?;
1384 Ok(())
1385}
1386
1387async fn share_project(
1388 request: proto::ShareProject,
1389 response: Response<proto::ShareProject>,
1390 session: Session,
1391) -> Result<()> {
1392 let (project_id, room) = &*session
1393 .db()
1394 .await
1395 .share_project(
1396 RoomId::from_proto(request.room_id),
1397 session.connection_id,
1398 &request.worktrees,
1399 )
1400 .await?;
1401 response.send(proto::ShareProjectResponse {
1402 project_id: project_id.to_proto(),
1403 })?;
1404 room_updated(&room, &session.peer);
1405
1406 Ok(())
1407}
1408
1409async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1410 let project_id = ProjectId::from_proto(message.project_id);
1411
1412 let (room, guest_connection_ids) = &*session
1413 .db()
1414 .await
1415 .unshare_project(project_id, session.connection_id)
1416 .await?;
1417
1418 broadcast(
1419 Some(session.connection_id),
1420 guest_connection_ids.iter().copied(),
1421 |conn_id| session.peer.send(conn_id, message.clone()),
1422 );
1423 room_updated(&room, &session.peer);
1424
1425 Ok(())
1426}
1427
1428async fn join_project(
1429 request: proto::JoinProject,
1430 response: Response<proto::JoinProject>,
1431 session: Session,
1432) -> Result<()> {
1433 let project_id = ProjectId::from_proto(request.project_id);
1434 let guest_user_id = session.user_id;
1435
1436 tracing::info!(%project_id, "join project");
1437
1438 let (project, replica_id) = &mut *session
1439 .db()
1440 .await
1441 .join_project(project_id, session.connection_id)
1442 .await?;
1443
1444 let collaborators = project
1445 .collaborators
1446 .iter()
1447 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1448 .map(|collaborator| collaborator.to_proto())
1449 .collect::<Vec<_>>();
1450
1451 let worktrees = project
1452 .worktrees
1453 .iter()
1454 .map(|(id, worktree)| proto::WorktreeMetadata {
1455 id: *id,
1456 root_name: worktree.root_name.clone(),
1457 visible: worktree.visible,
1458 abs_path: worktree.abs_path.clone(),
1459 })
1460 .collect::<Vec<_>>();
1461
1462 for collaborator in &collaborators {
1463 session
1464 .peer
1465 .send(
1466 collaborator.peer_id.unwrap().into(),
1467 proto::AddProjectCollaborator {
1468 project_id: project_id.to_proto(),
1469 collaborator: Some(proto::Collaborator {
1470 peer_id: Some(session.connection_id.into()),
1471 replica_id: replica_id.0 as u32,
1472 user_id: guest_user_id.to_proto(),
1473 }),
1474 },
1475 )
1476 .trace_err();
1477 }
1478
1479 // First, we send the metadata associated with each worktree.
1480 response.send(proto::JoinProjectResponse {
1481 worktrees: worktrees.clone(),
1482 replica_id: replica_id.0 as u32,
1483 collaborators: collaborators.clone(),
1484 language_servers: project.language_servers.clone(),
1485 })?;
1486
1487 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1488 #[cfg(any(test, feature = "test-support"))]
1489 const MAX_CHUNK_SIZE: usize = 2;
1490 #[cfg(not(any(test, feature = "test-support")))]
1491 const MAX_CHUNK_SIZE: usize = 256;
1492
1493 // Stream this worktree's entries.
1494 let message = proto::UpdateWorktree {
1495 project_id: project_id.to_proto(),
1496 worktree_id,
1497 abs_path: worktree.abs_path.clone(),
1498 root_name: worktree.root_name,
1499 updated_entries: worktree.entries,
1500 removed_entries: Default::default(),
1501 scan_id: worktree.scan_id,
1502 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1503 updated_repositories: worktree.repository_entries.into_values().collect(),
1504 removed_repositories: Default::default(),
1505 };
1506 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1507 session.peer.send(session.connection_id, update.clone())?;
1508 }
1509
1510 // Stream this worktree's diagnostics.
1511 for summary in worktree.diagnostic_summaries {
1512 session.peer.send(
1513 session.connection_id,
1514 proto::UpdateDiagnosticSummary {
1515 project_id: project_id.to_proto(),
1516 worktree_id: worktree.id,
1517 summary: Some(summary),
1518 },
1519 )?;
1520 }
1521
1522 for settings_file in worktree.settings_files {
1523 session.peer.send(
1524 session.connection_id,
1525 proto::UpdateWorktreeSettings {
1526 project_id: project_id.to_proto(),
1527 worktree_id: worktree.id,
1528 path: settings_file.path,
1529 content: Some(settings_file.content),
1530 },
1531 )?;
1532 }
1533 }
1534
1535 for language_server in &project.language_servers {
1536 session.peer.send(
1537 session.connection_id,
1538 proto::UpdateLanguageServer {
1539 project_id: project_id.to_proto(),
1540 language_server_id: language_server.id,
1541 variant: Some(
1542 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1543 proto::LspDiskBasedDiagnosticsUpdated {},
1544 ),
1545 ),
1546 },
1547 )?;
1548 }
1549
1550 Ok(())
1551}
1552
1553async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1554 let sender_id = session.connection_id;
1555 let project_id = ProjectId::from_proto(request.project_id);
1556
1557 let (room, project) = &*session
1558 .db()
1559 .await
1560 .leave_project(project_id, sender_id)
1561 .await?;
1562 tracing::info!(
1563 %project_id,
1564 host_user_id = %project.host_user_id,
1565 host_connection_id = %project.host_connection_id,
1566 "leave project"
1567 );
1568
1569 project_left(&project, &session);
1570 room_updated(&room, &session.peer);
1571
1572 Ok(())
1573}
1574
1575async fn update_project(
1576 request: proto::UpdateProject,
1577 response: Response<proto::UpdateProject>,
1578 session: Session,
1579) -> Result<()> {
1580 let project_id = ProjectId::from_proto(request.project_id);
1581 let (room, guest_connection_ids) = &*session
1582 .db()
1583 .await
1584 .update_project(project_id, session.connection_id, &request.worktrees)
1585 .await?;
1586 broadcast(
1587 Some(session.connection_id),
1588 guest_connection_ids.iter().copied(),
1589 |connection_id| {
1590 session
1591 .peer
1592 .forward_send(session.connection_id, connection_id, request.clone())
1593 },
1594 );
1595 room_updated(&room, &session.peer);
1596 response.send(proto::Ack {})?;
1597
1598 Ok(())
1599}
1600
1601async fn update_worktree(
1602 request: proto::UpdateWorktree,
1603 response: Response<proto::UpdateWorktree>,
1604 session: Session,
1605) -> Result<()> {
1606 let guest_connection_ids = session
1607 .db()
1608 .await
1609 .update_worktree(&request, session.connection_id)
1610 .await?;
1611
1612 broadcast(
1613 Some(session.connection_id),
1614 guest_connection_ids.iter().copied(),
1615 |connection_id| {
1616 session
1617 .peer
1618 .forward_send(session.connection_id, connection_id, request.clone())
1619 },
1620 );
1621 response.send(proto::Ack {})?;
1622 Ok(())
1623}
1624
1625async fn update_diagnostic_summary(
1626 message: proto::UpdateDiagnosticSummary,
1627 session: Session,
1628) -> Result<()> {
1629 let guest_connection_ids = session
1630 .db()
1631 .await
1632 .update_diagnostic_summary(&message, session.connection_id)
1633 .await?;
1634
1635 broadcast(
1636 Some(session.connection_id),
1637 guest_connection_ids.iter().copied(),
1638 |connection_id| {
1639 session
1640 .peer
1641 .forward_send(session.connection_id, connection_id, message.clone())
1642 },
1643 );
1644
1645 Ok(())
1646}
1647
1648async fn update_worktree_settings(
1649 message: proto::UpdateWorktreeSettings,
1650 session: Session,
1651) -> Result<()> {
1652 let guest_connection_ids = session
1653 .db()
1654 .await
1655 .update_worktree_settings(&message, session.connection_id)
1656 .await?;
1657
1658 broadcast(
1659 Some(session.connection_id),
1660 guest_connection_ids.iter().copied(),
1661 |connection_id| {
1662 session
1663 .peer
1664 .forward_send(session.connection_id, connection_id, message.clone())
1665 },
1666 );
1667
1668 Ok(())
1669}
1670
1671async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1672 broadcast_project_message(request.project_id, request, session).await
1673}
1674
1675async fn start_language_server(
1676 request: proto::StartLanguageServer,
1677 session: Session,
1678) -> Result<()> {
1679 let guest_connection_ids = session
1680 .db()
1681 .await
1682 .start_language_server(&request, session.connection_id)
1683 .await?;
1684
1685 broadcast(
1686 Some(session.connection_id),
1687 guest_connection_ids.iter().copied(),
1688 |connection_id| {
1689 session
1690 .peer
1691 .forward_send(session.connection_id, connection_id, request.clone())
1692 },
1693 );
1694 Ok(())
1695}
1696
1697async fn update_language_server(
1698 request: proto::UpdateLanguageServer,
1699 session: Session,
1700) -> Result<()> {
1701 session.executor.record_backtrace();
1702 let project_id = ProjectId::from_proto(request.project_id);
1703 let project_connection_ids = session
1704 .db()
1705 .await
1706 .project_connection_ids(project_id, session.connection_id)
1707 .await?;
1708 broadcast(
1709 Some(session.connection_id),
1710 project_connection_ids.iter().copied(),
1711 |connection_id| {
1712 session
1713 .peer
1714 .forward_send(session.connection_id, connection_id, request.clone())
1715 },
1716 );
1717 Ok(())
1718}
1719
1720async fn forward_project_request<T>(
1721 request: T,
1722 response: Response<T>,
1723 session: Session,
1724) -> Result<()>
1725where
1726 T: EntityMessage + RequestMessage,
1727{
1728 session.executor.record_backtrace();
1729 let project_id = ProjectId::from_proto(request.remote_entity_id());
1730 let host_connection_id = {
1731 let collaborators = session
1732 .db()
1733 .await
1734 .project_collaborators(project_id, session.connection_id)
1735 .await?;
1736 collaborators
1737 .iter()
1738 .find(|collaborator| collaborator.is_host)
1739 .ok_or_else(|| anyhow!("host not found"))?
1740 .connection_id
1741 };
1742
1743 let payload = session
1744 .peer
1745 .forward_request(session.connection_id, host_connection_id, request)
1746 .await?;
1747
1748 response.send(payload)?;
1749 Ok(())
1750}
1751
1752async fn create_buffer_for_peer(
1753 request: proto::CreateBufferForPeer,
1754 session: Session,
1755) -> Result<()> {
1756 session.executor.record_backtrace();
1757 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1758 session
1759 .peer
1760 .forward_send(session.connection_id, peer_id.into(), request)?;
1761 Ok(())
1762}
1763
1764async fn update_buffer(
1765 request: proto::UpdateBuffer,
1766 response: Response<proto::UpdateBuffer>,
1767 session: Session,
1768) -> Result<()> {
1769 session.executor.record_backtrace();
1770 let project_id = ProjectId::from_proto(request.project_id);
1771 let mut guest_connection_ids;
1772 let mut host_connection_id = None;
1773 {
1774 let collaborators = session
1775 .db()
1776 .await
1777 .project_collaborators(project_id, session.connection_id)
1778 .await?;
1779 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1780 for collaborator in collaborators.iter() {
1781 if collaborator.is_host {
1782 host_connection_id = Some(collaborator.connection_id);
1783 } else {
1784 guest_connection_ids.push(collaborator.connection_id);
1785 }
1786 }
1787 }
1788 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1789
1790 session.executor.record_backtrace();
1791 broadcast(
1792 Some(session.connection_id),
1793 guest_connection_ids,
1794 |connection_id| {
1795 session
1796 .peer
1797 .forward_send(session.connection_id, connection_id, request.clone())
1798 },
1799 );
1800 if host_connection_id != session.connection_id {
1801 session
1802 .peer
1803 .forward_request(session.connection_id, host_connection_id, request.clone())
1804 .await?;
1805 }
1806
1807 response.send(proto::Ack {})?;
1808 Ok(())
1809}
1810
1811async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1812 let project_id = ProjectId::from_proto(request.project_id);
1813 let project_connection_ids = session
1814 .db()
1815 .await
1816 .project_connection_ids(project_id, session.connection_id)
1817 .await?;
1818
1819 broadcast(
1820 Some(session.connection_id),
1821 project_connection_ids.iter().copied(),
1822 |connection_id| {
1823 session
1824 .peer
1825 .forward_send(session.connection_id, connection_id, request.clone())
1826 },
1827 );
1828 Ok(())
1829}
1830
1831async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1832 let project_id = ProjectId::from_proto(request.project_id);
1833 let project_connection_ids = session
1834 .db()
1835 .await
1836 .project_connection_ids(project_id, session.connection_id)
1837 .await?;
1838 broadcast(
1839 Some(session.connection_id),
1840 project_connection_ids.iter().copied(),
1841 |connection_id| {
1842 session
1843 .peer
1844 .forward_send(session.connection_id, connection_id, request.clone())
1845 },
1846 );
1847 Ok(())
1848}
1849
1850async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1851 broadcast_project_message(request.project_id, request, session).await
1852}
1853
1854async fn broadcast_project_message<T: EnvelopedMessage>(
1855 project_id: u64,
1856 request: T,
1857 session: Session,
1858) -> Result<()> {
1859 let project_id = ProjectId::from_proto(project_id);
1860 let project_connection_ids = session
1861 .db()
1862 .await
1863 .project_connection_ids(project_id, session.connection_id)
1864 .await?;
1865 broadcast(
1866 Some(session.connection_id),
1867 project_connection_ids.iter().copied(),
1868 |connection_id| {
1869 session
1870 .peer
1871 .forward_send(session.connection_id, connection_id, request.clone())
1872 },
1873 );
1874 Ok(())
1875}
1876
1877async fn follow(
1878 request: proto::Follow,
1879 response: Response<proto::Follow>,
1880 session: Session,
1881) -> Result<()> {
1882 let project_id = ProjectId::from_proto(request.project_id);
1883 let leader_id = request
1884 .leader_id
1885 .ok_or_else(|| anyhow!("invalid leader id"))?
1886 .into();
1887 let follower_id = session.connection_id;
1888
1889 {
1890 let project_connection_ids = session
1891 .db()
1892 .await
1893 .project_connection_ids(project_id, session.connection_id)
1894 .await?;
1895
1896 if !project_connection_ids.contains(&leader_id) {
1897 Err(anyhow!("no such peer"))?;
1898 }
1899 }
1900
1901 let mut response_payload = session
1902 .peer
1903 .forward_request(session.connection_id, leader_id, request)
1904 .await?;
1905 response_payload
1906 .views
1907 .retain(|view| view.leader_id != Some(follower_id.into()));
1908 response.send(response_payload)?;
1909
1910 let room = session
1911 .db()
1912 .await
1913 .follow(project_id, leader_id, follower_id)
1914 .await?;
1915 room_updated(&room, &session.peer);
1916
1917 Ok(())
1918}
1919
1920async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1921 let project_id = ProjectId::from_proto(request.project_id);
1922 let leader_id = request
1923 .leader_id
1924 .ok_or_else(|| anyhow!("invalid leader id"))?
1925 .into();
1926 let follower_id = session.connection_id;
1927
1928 if !session
1929 .db()
1930 .await
1931 .project_connection_ids(project_id, session.connection_id)
1932 .await?
1933 .contains(&leader_id)
1934 {
1935 Err(anyhow!("no such peer"))?;
1936 }
1937
1938 session
1939 .peer
1940 .forward_send(session.connection_id, leader_id, request)?;
1941
1942 let room = session
1943 .db()
1944 .await
1945 .unfollow(project_id, leader_id, follower_id)
1946 .await?;
1947 room_updated(&room, &session.peer);
1948
1949 Ok(())
1950}
1951
1952async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1953 let project_id = ProjectId::from_proto(request.project_id);
1954 let project_connection_ids = session
1955 .db
1956 .lock()
1957 .await
1958 .project_connection_ids(project_id, session.connection_id)
1959 .await?;
1960
1961 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1962 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1963 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1964 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1965 });
1966 for follower_peer_id in request.follower_ids.iter().copied() {
1967 let follower_connection_id = follower_peer_id.into();
1968 if project_connection_ids.contains(&follower_connection_id)
1969 && Some(follower_peer_id) != leader_id
1970 {
1971 session.peer.forward_send(
1972 session.connection_id,
1973 follower_connection_id,
1974 request.clone(),
1975 )?;
1976 }
1977 }
1978 Ok(())
1979}
1980
1981async fn get_users(
1982 request: proto::GetUsers,
1983 response: Response<proto::GetUsers>,
1984 session: Session,
1985) -> Result<()> {
1986 let user_ids = request
1987 .user_ids
1988 .into_iter()
1989 .map(UserId::from_proto)
1990 .collect();
1991 let users = session
1992 .db()
1993 .await
1994 .get_users_by_ids(user_ids)
1995 .await?
1996 .into_iter()
1997 .map(|user| proto::User {
1998 id: user.id.to_proto(),
1999 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2000 github_login: user.github_login,
2001 })
2002 .collect();
2003 response.send(proto::UsersResponse { users })?;
2004 Ok(())
2005}
2006
2007async fn fuzzy_search_users(
2008 request: proto::FuzzySearchUsers,
2009 response: Response<proto::FuzzySearchUsers>,
2010 session: Session,
2011) -> Result<()> {
2012 let query = request.query;
2013 let users = match query.len() {
2014 0 => vec![],
2015 1 | 2 => session
2016 .db()
2017 .await
2018 .get_user_by_github_login(&query)
2019 .await?
2020 .into_iter()
2021 .collect(),
2022 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2023 };
2024 let users = users
2025 .into_iter()
2026 .filter(|user| user.id != session.user_id)
2027 .map(|user| proto::User {
2028 id: user.id.to_proto(),
2029 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2030 github_login: user.github_login,
2031 })
2032 .collect();
2033 response.send(proto::UsersResponse { users })?;
2034 Ok(())
2035}
2036
2037async fn request_contact(
2038 request: proto::RequestContact,
2039 response: Response<proto::RequestContact>,
2040 session: Session,
2041) -> Result<()> {
2042 let requester_id = session.user_id;
2043 let responder_id = UserId::from_proto(request.responder_id);
2044 if requester_id == responder_id {
2045 return Err(anyhow!("cannot add yourself as a contact"))?;
2046 }
2047
2048 session
2049 .db()
2050 .await
2051 .send_contact_request(requester_id, responder_id)
2052 .await?;
2053
2054 // Update outgoing contact requests of requester
2055 let mut update = proto::UpdateContacts::default();
2056 update.outgoing_requests.push(responder_id.to_proto());
2057 for connection_id in session
2058 .connection_pool()
2059 .await
2060 .user_connection_ids(requester_id)
2061 {
2062 session.peer.send(connection_id, update.clone())?;
2063 }
2064
2065 // Update incoming contact requests of responder
2066 let mut update = proto::UpdateContacts::default();
2067 update
2068 .incoming_requests
2069 .push(proto::IncomingContactRequest {
2070 requester_id: requester_id.to_proto(),
2071 should_notify: true,
2072 });
2073 for connection_id in session
2074 .connection_pool()
2075 .await
2076 .user_connection_ids(responder_id)
2077 {
2078 session.peer.send(connection_id, update.clone())?;
2079 }
2080
2081 response.send(proto::Ack {})?;
2082 Ok(())
2083}
2084
2085async fn respond_to_contact_request(
2086 request: proto::RespondToContactRequest,
2087 response: Response<proto::RespondToContactRequest>,
2088 session: Session,
2089) -> Result<()> {
2090 let responder_id = session.user_id;
2091 let requester_id = UserId::from_proto(request.requester_id);
2092 let db = session.db().await;
2093 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2094 db.dismiss_contact_notification(responder_id, requester_id)
2095 .await?;
2096 } else {
2097 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2098
2099 db.respond_to_contact_request(responder_id, requester_id, accept)
2100 .await?;
2101 let requester_busy = db.is_user_busy(requester_id).await?;
2102 let responder_busy = db.is_user_busy(responder_id).await?;
2103
2104 let pool = session.connection_pool().await;
2105 // Update responder with new contact
2106 let mut update = proto::UpdateContacts::default();
2107 if accept {
2108 update
2109 .contacts
2110 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2111 }
2112 update
2113 .remove_incoming_requests
2114 .push(requester_id.to_proto());
2115 for connection_id in pool.user_connection_ids(responder_id) {
2116 session.peer.send(connection_id, update.clone())?;
2117 }
2118
2119 // Update requester with new contact
2120 let mut update = proto::UpdateContacts::default();
2121 if accept {
2122 update
2123 .contacts
2124 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2125 }
2126 update
2127 .remove_outgoing_requests
2128 .push(responder_id.to_proto());
2129 for connection_id in pool.user_connection_ids(requester_id) {
2130 session.peer.send(connection_id, update.clone())?;
2131 }
2132 }
2133
2134 response.send(proto::Ack {})?;
2135 Ok(())
2136}
2137
2138async fn remove_contact(
2139 request: proto::RemoveContact,
2140 response: Response<proto::RemoveContact>,
2141 session: Session,
2142) -> Result<()> {
2143 let requester_id = session.user_id;
2144 let responder_id = UserId::from_proto(request.user_id);
2145 let db = session.db().await;
2146 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2147
2148 let pool = session.connection_pool().await;
2149 // Update outgoing contact requests of requester
2150 let mut update = proto::UpdateContacts::default();
2151 if contact_accepted {
2152 update.remove_contacts.push(responder_id.to_proto());
2153 } else {
2154 update
2155 .remove_outgoing_requests
2156 .push(responder_id.to_proto());
2157 }
2158 for connection_id in pool.user_connection_ids(requester_id) {
2159 session.peer.send(connection_id, update.clone())?;
2160 }
2161
2162 // Update incoming contact requests of responder
2163 let mut update = proto::UpdateContacts::default();
2164 if contact_accepted {
2165 update.remove_contacts.push(requester_id.to_proto());
2166 } else {
2167 update
2168 .remove_incoming_requests
2169 .push(requester_id.to_proto());
2170 }
2171 for connection_id in pool.user_connection_ids(responder_id) {
2172 session.peer.send(connection_id, update.clone())?;
2173 }
2174
2175 response.send(proto::Ack {})?;
2176 Ok(())
2177}
2178
2179async fn create_channel(
2180 request: proto::CreateChannel,
2181 response: Response<proto::CreateChannel>,
2182 session: Session,
2183) -> Result<()> {
2184 let db = session.db().await;
2185 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2186
2187 if let Some(live_kit) = session.live_kit_client.as_ref() {
2188 live_kit.create_room(live_kit_room.clone()).await?;
2189 }
2190
2191 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2192 let id = db
2193 .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2194 .await?;
2195
2196 let channel = proto::Channel {
2197 id: id.to_proto(),
2198 name: request.name,
2199 parent_id: request.parent_id,
2200 };
2201
2202 response.send(proto::ChannelResponse {
2203 channel: Some(channel.clone()),
2204 })?;
2205
2206 let mut update = proto::UpdateChannels::default();
2207 update.channels.push(channel);
2208
2209 let user_ids_to_notify = if let Some(parent_id) = parent_id {
2210 db.get_channel_members(parent_id).await?
2211 } else {
2212 vec![session.user_id]
2213 };
2214
2215 let connection_pool = session.connection_pool().await;
2216 for user_id in user_ids_to_notify {
2217 for connection_id in connection_pool.user_connection_ids(user_id) {
2218 let mut update = update.clone();
2219 if user_id == session.user_id {
2220 update.channel_permissions.push(proto::ChannelPermission {
2221 channel_id: id.to_proto(),
2222 is_admin: true,
2223 });
2224 }
2225 session.peer.send(connection_id, update)?;
2226 }
2227 }
2228
2229 Ok(())
2230}
2231
2232async fn remove_channel(
2233 request: proto::RemoveChannel,
2234 response: Response<proto::RemoveChannel>,
2235 session: Session,
2236) -> Result<()> {
2237 let db = session.db().await;
2238
2239 let channel_id = request.channel_id;
2240 let (removed_channels, member_ids) = db
2241 .remove_channel(ChannelId::from_proto(channel_id), session.user_id)
2242 .await?;
2243 response.send(proto::Ack {})?;
2244
2245 // Notify members of removed channels
2246 let mut update = proto::UpdateChannels::default();
2247 update
2248 .remove_channels
2249 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2250
2251 let connection_pool = session.connection_pool().await;
2252 for member_id in member_ids {
2253 for connection_id in connection_pool.user_connection_ids(member_id) {
2254 session.peer.send(connection_id, update.clone())?;
2255 }
2256 }
2257
2258 Ok(())
2259}
2260
2261async fn invite_channel_member(
2262 request: proto::InviteChannelMember,
2263 response: Response<proto::InviteChannelMember>,
2264 session: Session,
2265) -> Result<()> {
2266 let db = session.db().await;
2267 let channel_id = ChannelId::from_proto(request.channel_id);
2268 let invitee_id = UserId::from_proto(request.user_id);
2269 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2270 .await?;
2271
2272 let (channel, _) = db
2273 .get_channel(channel_id, session.user_id)
2274 .await?
2275 .ok_or_else(|| anyhow!("channel not found"))?;
2276
2277 let mut update = proto::UpdateChannels::default();
2278 update.channel_invitations.push(proto::Channel {
2279 id: channel.id.to_proto(),
2280 name: channel.name,
2281 parent_id: None,
2282 });
2283 for connection_id in session
2284 .connection_pool()
2285 .await
2286 .user_connection_ids(invitee_id)
2287 {
2288 session.peer.send(connection_id, update.clone())?;
2289 }
2290
2291 response.send(proto::Ack {})?;
2292 Ok(())
2293}
2294
2295async fn remove_channel_member(
2296 request: proto::RemoveChannelMember,
2297 response: Response<proto::RemoveChannelMember>,
2298 session: Session,
2299) -> Result<()> {
2300 let db = session.db().await;
2301 let channel_id = ChannelId::from_proto(request.channel_id);
2302 let member_id = UserId::from_proto(request.user_id);
2303
2304 db.remove_channel_member(channel_id, member_id, session.user_id)
2305 .await?;
2306
2307 let mut update = proto::UpdateChannels::default();
2308 update.remove_channels.push(channel_id.to_proto());
2309
2310 for connection_id in session
2311 .connection_pool()
2312 .await
2313 .user_connection_ids(member_id)
2314 {
2315 session.peer.send(connection_id, update.clone())?;
2316 }
2317
2318 response.send(proto::Ack {})?;
2319 Ok(())
2320}
2321
2322async fn set_channel_member_admin(
2323 request: proto::SetChannelMemberAdmin,
2324 response: Response<proto::SetChannelMemberAdmin>,
2325 session: Session,
2326) -> Result<()> {
2327 let db = session.db().await;
2328 let channel_id = ChannelId::from_proto(request.channel_id);
2329 let member_id = UserId::from_proto(request.user_id);
2330 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2331 .await?;
2332
2333 let (channel, has_accepted) = db
2334 .get_channel(channel_id, member_id)
2335 .await?
2336 .ok_or_else(|| anyhow!("channel not found"))?;
2337
2338 let mut update = proto::UpdateChannels::default();
2339 if has_accepted {
2340 update.channel_permissions.push(proto::ChannelPermission {
2341 channel_id: channel.id.to_proto(),
2342 is_admin: request.admin,
2343 });
2344 }
2345
2346 for connection_id in session
2347 .connection_pool()
2348 .await
2349 .user_connection_ids(member_id)
2350 {
2351 session.peer.send(connection_id, update.clone())?;
2352 }
2353
2354 response.send(proto::Ack {})?;
2355 Ok(())
2356}
2357
2358async fn rename_channel(
2359 request: proto::RenameChannel,
2360 response: Response<proto::RenameChannel>,
2361 session: Session,
2362) -> Result<()> {
2363 let db = session.db().await;
2364 let channel_id = ChannelId::from_proto(request.channel_id);
2365 let new_name = db
2366 .rename_channel(channel_id, session.user_id, &request.name)
2367 .await?;
2368
2369 let channel = proto::Channel {
2370 id: request.channel_id,
2371 name: new_name,
2372 parent_id: None,
2373 };
2374 response.send(proto::ChannelResponse {
2375 channel: Some(channel.clone()),
2376 })?;
2377 let mut update = proto::UpdateChannels::default();
2378 update.channels.push(channel);
2379
2380 let member_ids = db.get_channel_members(channel_id).await?;
2381
2382 let connection_pool = session.connection_pool().await;
2383 for member_id in member_ids {
2384 for connection_id in connection_pool.user_connection_ids(member_id) {
2385 session.peer.send(connection_id, update.clone())?;
2386 }
2387 }
2388
2389 Ok(())
2390}
2391
2392async fn get_channel_members(
2393 request: proto::GetChannelMembers,
2394 response: Response<proto::GetChannelMembers>,
2395 session: Session,
2396) -> Result<()> {
2397 let db = session.db().await;
2398 let channel_id = ChannelId::from_proto(request.channel_id);
2399 let members = db
2400 .get_channel_member_details(channel_id, session.user_id)
2401 .await?;
2402 response.send(proto::GetChannelMembersResponse { members })?;
2403 Ok(())
2404}
2405
2406async fn respond_to_channel_invite(
2407 request: proto::RespondToChannelInvite,
2408 response: Response<proto::RespondToChannelInvite>,
2409 session: Session,
2410) -> Result<()> {
2411 let db = session.db().await;
2412 let channel_id = ChannelId::from_proto(request.channel_id);
2413 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2414 .await?;
2415
2416 let mut update = proto::UpdateChannels::default();
2417 update
2418 .remove_channel_invitations
2419 .push(channel_id.to_proto());
2420 if request.accept {
2421 let result = db.get_channels_for_user(session.user_id).await?;
2422 update
2423 .channels
2424 .extend(result.channels.into_iter().map(|channel| proto::Channel {
2425 id: channel.id.to_proto(),
2426 name: channel.name,
2427 parent_id: channel.parent_id.map(ChannelId::to_proto),
2428 }));
2429 update
2430 .channel_participants
2431 .extend(
2432 result
2433 .channel_participants
2434 .into_iter()
2435 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2436 channel_id: channel_id.to_proto(),
2437 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2438 }),
2439 );
2440 update
2441 .channel_permissions
2442 .extend(
2443 result
2444 .channels_with_admin_privileges
2445 .into_iter()
2446 .map(|channel_id| proto::ChannelPermission {
2447 channel_id: channel_id.to_proto(),
2448 is_admin: true,
2449 }),
2450 );
2451 }
2452 session.peer.send(session.connection_id, update)?;
2453 response.send(proto::Ack {})?;
2454
2455 Ok(())
2456}
2457
2458async fn join_channel(
2459 request: proto::JoinChannel,
2460 response: Response<proto::JoinChannel>,
2461 session: Session,
2462) -> Result<()> {
2463 let channel_id = ChannelId::from_proto(request.channel_id);
2464
2465 let joined_room = {
2466 leave_room_for_session(&session).await?;
2467 let db = session.db().await;
2468
2469 let room_id = db.room_id_for_channel(channel_id).await?;
2470
2471 let joined_room = db
2472 .join_room(room_id, session.user_id, session.connection_id)
2473 .await?;
2474
2475 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2476 let token = live_kit
2477 .room_token(
2478 &joined_room.room.live_kit_room,
2479 &session.user_id.to_string(),
2480 )
2481 .trace_err()?;
2482
2483 Some(LiveKitConnectionInfo {
2484 server_url: live_kit.url().into(),
2485 token,
2486 })
2487 });
2488
2489 response.send(proto::JoinRoomResponse {
2490 room: Some(joined_room.room.clone()),
2491 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2492 live_kit_connection_info,
2493 })?;
2494
2495 room_updated(&joined_room.room, &session.peer);
2496
2497 joined_room.into_inner()
2498 };
2499
2500 channel_updated(
2501 channel_id,
2502 &joined_room.room,
2503 &joined_room.channel_members,
2504 &session.peer,
2505 &*session.connection_pool().await,
2506 );
2507
2508 update_user_contacts(session.user_id, &session).await?;
2509
2510 Ok(())
2511}
2512
2513async fn join_channel_buffer(
2514 request: proto::JoinChannelBuffer,
2515 response: Response<proto::JoinChannelBuffer>,
2516 session: Session,
2517) -> Result<()> {
2518 let db = session.db().await;
2519 let channel_id = ChannelId::from_proto(request.channel_id);
2520
2521 let open_response = db
2522 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2523 .await?;
2524
2525 let replica_id = open_response.replica_id;
2526 let collaborators = open_response.collaborators.clone();
2527
2528 response.send(open_response)?;
2529
2530 let update = AddChannelBufferCollaborator {
2531 channel_id: channel_id.to_proto(),
2532 collaborator: Some(proto::Collaborator {
2533 user_id: session.user_id.to_proto(),
2534 peer_id: Some(session.connection_id.into()),
2535 replica_id,
2536 }),
2537 };
2538 channel_buffer_updated(
2539 session.connection_id,
2540 collaborators
2541 .iter()
2542 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2543 &update,
2544 &session.peer,
2545 );
2546
2547 Ok(())
2548}
2549
2550async fn update_channel_buffer(
2551 request: proto::UpdateChannelBuffer,
2552 session: Session,
2553) -> Result<()> {
2554 let db = session.db().await;
2555 let channel_id = ChannelId::from_proto(request.channel_id);
2556
2557 let collaborators = db
2558 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2559 .await?;
2560
2561 channel_buffer_updated(
2562 session.connection_id,
2563 collaborators,
2564 &proto::UpdateChannelBuffer {
2565 channel_id: channel_id.to_proto(),
2566 operations: request.operations,
2567 },
2568 &session.peer,
2569 );
2570 Ok(())
2571}
2572
2573async fn rejoin_channel_buffers(
2574 request: proto::RejoinChannelBuffers,
2575 response: Response<proto::RejoinChannelBuffers>,
2576 session: Session,
2577) -> Result<()> {
2578 let db = session.db().await;
2579 let buffers = db
2580 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2581 .await?;
2582
2583 for buffer in &buffers {
2584 let collaborators_to_notify = buffer
2585 .buffer
2586 .collaborators
2587 .iter()
2588 .filter_map(|c| Some(c.peer_id?.into()));
2589 channel_buffer_updated(
2590 session.connection_id,
2591 collaborators_to_notify,
2592 &proto::UpdateChannelBufferCollaborator {
2593 channel_id: buffer.buffer.channel_id,
2594 old_peer_id: Some(buffer.old_connection_id.into()),
2595 new_peer_id: Some(session.connection_id.into()),
2596 },
2597 &session.peer,
2598 );
2599 }
2600
2601 response.send(proto::RejoinChannelBuffersResponse {
2602 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2603 })?;
2604
2605 Ok(())
2606}
2607
2608async fn leave_channel_buffer(
2609 request: proto::LeaveChannelBuffer,
2610 response: Response<proto::LeaveChannelBuffer>,
2611 session: Session,
2612) -> Result<()> {
2613 let db = session.db().await;
2614 let channel_id = ChannelId::from_proto(request.channel_id);
2615
2616 let collaborators_to_notify = db
2617 .leave_channel_buffer(channel_id, session.connection_id)
2618 .await?;
2619
2620 response.send(Ack {})?;
2621
2622 channel_buffer_updated(
2623 session.connection_id,
2624 collaborators_to_notify,
2625 &proto::RemoveChannelBufferCollaborator {
2626 channel_id: channel_id.to_proto(),
2627 peer_id: Some(session.connection_id.into()),
2628 },
2629 &session.peer,
2630 );
2631
2632 Ok(())
2633}
2634
2635fn channel_buffer_updated<T: EnvelopedMessage>(
2636 sender_id: ConnectionId,
2637 collaborators: impl IntoIterator<Item = ConnectionId>,
2638 message: &T,
2639 peer: &Peer,
2640) {
2641 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2642 peer.send(peer_id.into(), message.clone())
2643 });
2644}
2645
2646async fn send_channel_message(
2647 request: proto::SendChannelMessage,
2648 response: Response<proto::SendChannelMessage>,
2649 session: Session,
2650) -> Result<()> {
2651 // Validate the message body.
2652 let body = request.body.trim().to_string();
2653 if body.len() > MAX_MESSAGE_LEN {
2654 return Err(anyhow!("message is too long"))?;
2655 }
2656 if body.is_empty() {
2657 return Err(anyhow!("message can't be blank"))?;
2658 }
2659
2660 let timestamp = OffsetDateTime::now_utc();
2661 let nonce = request
2662 .nonce
2663 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2664
2665 let channel_id = ChannelId::from_proto(request.channel_id);
2666 let (message_id, connection_ids) = session
2667 .db()
2668 .await
2669 .create_channel_message(
2670 channel_id,
2671 session.user_id,
2672 &body,
2673 timestamp,
2674 nonce.clone().into(),
2675 )
2676 .await?;
2677 let message = proto::ChannelMessage {
2678 sender_id: session.user_id.to_proto(),
2679 id: message_id.to_proto(),
2680 body,
2681 timestamp: timestamp.unix_timestamp() as u64,
2682 nonce: Some(nonce),
2683 };
2684 broadcast(Some(session.connection_id), connection_ids, |connection| {
2685 session.peer.send(
2686 connection,
2687 proto::ChannelMessageSent {
2688 channel_id: channel_id.to_proto(),
2689 message: Some(message.clone()),
2690 },
2691 )
2692 });
2693 response.send(proto::SendChannelMessageResponse {
2694 message: Some(message),
2695 })?;
2696 Ok(())
2697}
2698
2699async fn join_channel_chat(
2700 request: proto::JoinChannelChat,
2701 response: Response<proto::JoinChannelChat>,
2702 session: Session,
2703) -> Result<()> {
2704 let channel_id = ChannelId::from_proto(request.channel_id);
2705
2706 let db = session.db().await;
2707 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2708 .await?;
2709 let messages = db
2710 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2711 .await?;
2712 response.send(proto::JoinChannelChatResponse {
2713 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2714 messages,
2715 })?;
2716 Ok(())
2717}
2718
2719async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2720 let channel_id = ChannelId::from_proto(request.channel_id);
2721 session
2722 .db()
2723 .await
2724 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2725 .await?;
2726 Ok(())
2727}
2728
2729async fn get_channel_messages(
2730 request: proto::GetChannelMessages,
2731 response: Response<proto::GetChannelMessages>,
2732 session: Session,
2733) -> Result<()> {
2734 let channel_id = ChannelId::from_proto(request.channel_id);
2735 let messages = session
2736 .db()
2737 .await
2738 .get_channel_messages(
2739 channel_id,
2740 session.user_id,
2741 MESSAGE_COUNT_PER_PAGE,
2742 Some(MessageId::from_proto(request.before_message_id)),
2743 )
2744 .await?;
2745 response.send(proto::GetChannelMessagesResponse {
2746 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2747 messages,
2748 })?;
2749 Ok(())
2750}
2751
2752async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2753 let project_id = ProjectId::from_proto(request.project_id);
2754 let project_connection_ids = session
2755 .db()
2756 .await
2757 .project_connection_ids(project_id, session.connection_id)
2758 .await?;
2759 broadcast(
2760 Some(session.connection_id),
2761 project_connection_ids.iter().copied(),
2762 |connection_id| {
2763 session
2764 .peer
2765 .forward_send(session.connection_id, connection_id, request.clone())
2766 },
2767 );
2768 Ok(())
2769}
2770
2771async fn get_private_user_info(
2772 _request: proto::GetPrivateUserInfo,
2773 response: Response<proto::GetPrivateUserInfo>,
2774 session: Session,
2775) -> Result<()> {
2776 let db = session.db().await;
2777
2778 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2779 let user = db
2780 .get_user_by_id(session.user_id)
2781 .await?
2782 .ok_or_else(|| anyhow!("user not found"))?;
2783 let flags = db.get_user_flags(session.user_id).await?;
2784
2785 response.send(proto::GetPrivateUserInfoResponse {
2786 metrics_id,
2787 staff: user.admin,
2788 flags,
2789 })?;
2790 Ok(())
2791}
2792
2793fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2794 match message {
2795 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2796 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2797 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2798 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2799 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2800 code: frame.code.into(),
2801 reason: frame.reason,
2802 })),
2803 }
2804}
2805
2806fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2807 match message {
2808 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2809 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2810 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2811 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2812 AxumMessage::Close(frame) => {
2813 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2814 code: frame.code.into(),
2815 reason: frame.reason,
2816 }))
2817 }
2818 }
2819}
2820
2821fn build_initial_channels_update(
2822 channels: ChannelsForUser,
2823 channel_invites: Vec<db::Channel>,
2824) -> proto::UpdateChannels {
2825 let mut update = proto::UpdateChannels::default();
2826
2827 for channel in channels.channels {
2828 update.channels.push(proto::Channel {
2829 id: channel.id.to_proto(),
2830 name: channel.name,
2831 parent_id: channel.parent_id.map(|id| id.to_proto()),
2832 });
2833 }
2834
2835 for (channel_id, participants) in channels.channel_participants {
2836 update
2837 .channel_participants
2838 .push(proto::ChannelParticipants {
2839 channel_id: channel_id.to_proto(),
2840 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2841 });
2842 }
2843
2844 update
2845 .channel_permissions
2846 .extend(
2847 channels
2848 .channels_with_admin_privileges
2849 .into_iter()
2850 .map(|id| proto::ChannelPermission {
2851 channel_id: id.to_proto(),
2852 is_admin: true,
2853 }),
2854 );
2855
2856 for channel in channel_invites {
2857 update.channel_invitations.push(proto::Channel {
2858 id: channel.id.to_proto(),
2859 name: channel.name,
2860 parent_id: None,
2861 });
2862 }
2863
2864 update
2865}
2866
2867fn build_initial_contacts_update(
2868 contacts: Vec<db::Contact>,
2869 pool: &ConnectionPool,
2870) -> proto::UpdateContacts {
2871 let mut update = proto::UpdateContacts::default();
2872
2873 for contact in contacts {
2874 match contact {
2875 db::Contact::Accepted {
2876 user_id,
2877 should_notify,
2878 busy,
2879 } => {
2880 update
2881 .contacts
2882 .push(contact_for_user(user_id, should_notify, busy, &pool));
2883 }
2884 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2885 db::Contact::Incoming {
2886 user_id,
2887 should_notify,
2888 } => update
2889 .incoming_requests
2890 .push(proto::IncomingContactRequest {
2891 requester_id: user_id.to_proto(),
2892 should_notify,
2893 }),
2894 }
2895 }
2896
2897 update
2898}
2899
2900fn contact_for_user(
2901 user_id: UserId,
2902 should_notify: bool,
2903 busy: bool,
2904 pool: &ConnectionPool,
2905) -> proto::Contact {
2906 proto::Contact {
2907 user_id: user_id.to_proto(),
2908 online: pool.is_user_online(user_id),
2909 busy,
2910 should_notify,
2911 }
2912}
2913
2914fn room_updated(room: &proto::Room, peer: &Peer) {
2915 broadcast(
2916 None,
2917 room.participants
2918 .iter()
2919 .filter_map(|participant| Some(participant.peer_id?.into())),
2920 |peer_id| {
2921 peer.send(
2922 peer_id.into(),
2923 proto::RoomUpdated {
2924 room: Some(room.clone()),
2925 },
2926 )
2927 },
2928 );
2929}
2930
2931fn channel_updated(
2932 channel_id: ChannelId,
2933 room: &proto::Room,
2934 channel_members: &[UserId],
2935 peer: &Peer,
2936 pool: &ConnectionPool,
2937) {
2938 let participants = room
2939 .participants
2940 .iter()
2941 .map(|p| p.user_id)
2942 .collect::<Vec<_>>();
2943
2944 broadcast(
2945 None,
2946 channel_members
2947 .iter()
2948 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2949 |peer_id| {
2950 peer.send(
2951 peer_id.into(),
2952 proto::UpdateChannels {
2953 channel_participants: vec![proto::ChannelParticipants {
2954 channel_id: channel_id.to_proto(),
2955 participant_user_ids: participants.clone(),
2956 }],
2957 ..Default::default()
2958 },
2959 )
2960 },
2961 );
2962}
2963
2964async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2965 let db = session.db().await;
2966
2967 let contacts = db.get_contacts(user_id).await?;
2968 let busy = db.is_user_busy(user_id).await?;
2969
2970 let pool = session.connection_pool().await;
2971 let updated_contact = contact_for_user(user_id, false, busy, &pool);
2972 for contact in contacts {
2973 if let db::Contact::Accepted {
2974 user_id: contact_user_id,
2975 ..
2976 } = contact
2977 {
2978 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2979 session
2980 .peer
2981 .send(
2982 contact_conn_id,
2983 proto::UpdateContacts {
2984 contacts: vec![updated_contact.clone()],
2985 remove_contacts: Default::default(),
2986 incoming_requests: Default::default(),
2987 remove_incoming_requests: Default::default(),
2988 outgoing_requests: Default::default(),
2989 remove_outgoing_requests: Default::default(),
2990 },
2991 )
2992 .trace_err();
2993 }
2994 }
2995 }
2996 Ok(())
2997}
2998
2999async fn leave_room_for_session(session: &Session) -> Result<()> {
3000 let mut contacts_to_update = HashSet::default();
3001
3002 let room_id;
3003 let canceled_calls_to_user_ids;
3004 let live_kit_room;
3005 let delete_live_kit_room;
3006 let room;
3007 let channel_members;
3008 let channel_id;
3009
3010 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3011 contacts_to_update.insert(session.user_id);
3012
3013 for project in left_room.left_projects.values() {
3014 project_left(project, session);
3015 }
3016
3017 room_id = RoomId::from_proto(left_room.room.id);
3018 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3019 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3020 delete_live_kit_room = left_room.deleted;
3021 room = mem::take(&mut left_room.room);
3022 channel_members = mem::take(&mut left_room.channel_members);
3023 channel_id = left_room.channel_id;
3024
3025 room_updated(&room, &session.peer);
3026 } else {
3027 return Ok(());
3028 }
3029
3030 if let Some(channel_id) = channel_id {
3031 channel_updated(
3032 channel_id,
3033 &room,
3034 &channel_members,
3035 &session.peer,
3036 &*session.connection_pool().await,
3037 );
3038 }
3039
3040 {
3041 let pool = session.connection_pool().await;
3042 for canceled_user_id in canceled_calls_to_user_ids {
3043 for connection_id in pool.user_connection_ids(canceled_user_id) {
3044 session
3045 .peer
3046 .send(
3047 connection_id,
3048 proto::CallCanceled {
3049 room_id: room_id.to_proto(),
3050 },
3051 )
3052 .trace_err();
3053 }
3054 contacts_to_update.insert(canceled_user_id);
3055 }
3056 }
3057
3058 for contact_user_id in contacts_to_update {
3059 update_user_contacts(contact_user_id, &session).await?;
3060 }
3061
3062 if let Some(live_kit) = session.live_kit_client.as_ref() {
3063 live_kit
3064 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3065 .await
3066 .trace_err();
3067
3068 if delete_live_kit_room {
3069 live_kit.delete_room(live_kit_room).await.trace_err();
3070 }
3071 }
3072
3073 Ok(())
3074}
3075
3076async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3077 let left_channel_buffers = session
3078 .db()
3079 .await
3080 .leave_channel_buffers(session.connection_id)
3081 .await?;
3082
3083 for (channel_id, connections) in left_channel_buffers {
3084 channel_buffer_updated(
3085 session.connection_id,
3086 connections,
3087 &proto::RemoveChannelBufferCollaborator {
3088 channel_id: channel_id.to_proto(),
3089 peer_id: Some(session.connection_id.into()),
3090 },
3091 &session.peer,
3092 );
3093 }
3094
3095 Ok(())
3096}
3097
3098fn project_left(project: &db::LeftProject, session: &Session) {
3099 for connection_id in &project.connection_ids {
3100 if project.host_user_id == session.user_id {
3101 session
3102 .peer
3103 .send(
3104 *connection_id,
3105 proto::UnshareProject {
3106 project_id: project.id.to_proto(),
3107 },
3108 )
3109 .trace_err();
3110 } else {
3111 session
3112 .peer
3113 .send(
3114 *connection_id,
3115 proto::RemoveProjectCollaborator {
3116 project_id: project.id.to_proto(),
3117 peer_id: Some(session.connection_id.into()),
3118 },
3119 )
3120 .trace_err();
3121 }
3122 }
3123}
3124
3125pub trait ResultExt {
3126 type Ok;
3127
3128 fn trace_err(self) -> Option<Self::Ok>;
3129}
3130
3131impl<T, E> ResultExt for Result<T, E>
3132where
3133 E: std::fmt::Debug,
3134{
3135 type Ok = T;
3136
3137 fn trace_err(self) -> Option<T> {
3138 match self {
3139 Ok(value) => Some(value),
3140 Err(error) => {
3141 tracing::error!("{:?}", error);
3142 None
3143 }
3144 }
3145 }
3146}