1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
7 UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66
67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
69
70const MESSAGE_COUNT_PER_PAGE: usize = 100;
71const MAX_MESSAGE_LEN: usize = 1024;
72
73lazy_static! {
74 static ref METRIC_CONNECTIONS: IntGauge =
75 register_int_gauge!("connections", "number of connections").unwrap();
76 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
77 "shared_projects",
78 "number of open projects with one or more guests"
79 )
80 .unwrap();
81}
82
83type MessageHandler =
84 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
85
86struct Response<R> {
87 peer: Arc<Peer>,
88 receipt: Receipt<R>,
89 responded: Arc<AtomicBool>,
90}
91
92impl<R: RequestMessage> Response<R> {
93 fn send(self, payload: R::Response) -> Result<()> {
94 self.responded.store(true, SeqCst);
95 self.peer.respond(self.receipt, payload)?;
96 Ok(())
97 }
98}
99
100#[derive(Clone)]
101struct Session {
102 user_id: UserId,
103 connection_id: ConnectionId,
104 db: Arc<tokio::sync::Mutex<DbHandle>>,
105 peer: Arc<Peer>,
106 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
107 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
108 executor: Executor,
109}
110
111impl Session {
112 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 let guard = self.db.lock().await;
116 #[cfg(test)]
117 tokio::task::yield_now().await;
118 guard
119 }
120
121 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
122 #[cfg(test)]
123 tokio::task::yield_now().await;
124 let guard = self.connection_pool.lock();
125 ConnectionPoolGuard {
126 guard,
127 _not_send: PhantomData,
128 }
129 }
130}
131
132impl fmt::Debug for Session {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("Session")
135 .field("user_id", &self.user_id)
136 .field("connection_id", &self.connection_id)
137 .finish()
138 }
139}
140
141struct DbHandle(Arc<Database>);
142
143impl Deref for DbHandle {
144 type Target = Database;
145
146 fn deref(&self) -> &Self::Target {
147 self.0.as_ref()
148 }
149}
150
151pub struct Server {
152 id: parking_lot::Mutex<ServerId>,
153 peer: Arc<Peer>,
154 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
155 app_state: Arc<AppState>,
156 executor: Executor,
157 handlers: HashMap<TypeId, MessageHandler>,
158 teardown: watch::Sender<()>,
159}
160
161pub(crate) struct ConnectionPoolGuard<'a> {
162 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
163 _not_send: PhantomData<Rc<()>>,
164}
165
166#[derive(Serialize)]
167pub struct ServerSnapshot<'a> {
168 peer: &'a Peer,
169 #[serde(serialize_with = "serialize_deref")]
170 connection_pool: ConnectionPoolGuard<'a>,
171}
172
173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
174where
175 S: Serializer,
176 T: Deref<Target = U>,
177 U: Serialize,
178{
179 Serialize::serialize(value.deref(), serializer)
180}
181
182impl Server {
183 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
184 let mut server = Self {
185 id: parking_lot::Mutex::new(id),
186 peer: Peer::new(id.0 as u32),
187 app_state,
188 executor,
189 connection_pool: Default::default(),
190 handlers: Default::default(),
191 teardown: watch::channel(()).0,
192 };
193
194 server
195 .add_request_handler(ping)
196 .add_request_handler(create_room)
197 .add_request_handler(join_room)
198 .add_request_handler(rejoin_room)
199 .add_request_handler(leave_room)
200 .add_request_handler(call)
201 .add_request_handler(cancel_call)
202 .add_message_handler(decline_call)
203 .add_request_handler(update_participant_location)
204 .add_request_handler(share_project)
205 .add_message_handler(unshare_project)
206 .add_request_handler(join_project)
207 .add_message_handler(leave_project)
208 .add_request_handler(update_project)
209 .add_request_handler(update_worktree)
210 .add_message_handler(start_language_server)
211 .add_message_handler(update_language_server)
212 .add_message_handler(update_diagnostic_summary)
213 .add_message_handler(update_worktree_settings)
214 .add_message_handler(refresh_inlay_hints)
215 .add_request_handler(forward_project_request::<proto::GetHover>)
216 .add_request_handler(forward_project_request::<proto::GetDefinition>)
217 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetReferences>)
219 .add_request_handler(forward_project_request::<proto::SearchProject>)
220 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
221 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
222 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
225 .add_request_handler(forward_project_request::<proto::GetCompletions>)
226 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
227 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
229 .add_request_handler(forward_project_request::<proto::PrepareRename>)
230 .add_request_handler(forward_project_request::<proto::PerformRename>)
231 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
232 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
233 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
234 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
235 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
240 .add_request_handler(forward_project_request::<proto::InlayHints>)
241 .add_message_handler(create_buffer_for_peer)
242 .add_request_handler(update_buffer)
243 .add_message_handler(update_buffer_file)
244 .add_message_handler(buffer_reloaded)
245 .add_message_handler(buffer_saved)
246 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
247 .add_request_handler(get_users)
248 .add_request_handler(fuzzy_search_users)
249 .add_request_handler(request_contact)
250 .add_request_handler(remove_contact)
251 .add_request_handler(respond_to_contact_request)
252 .add_request_handler(create_channel)
253 .add_request_handler(remove_channel)
254 .add_request_handler(invite_channel_member)
255 .add_request_handler(remove_channel_member)
256 .add_request_handler(set_channel_member_admin)
257 .add_request_handler(rename_channel)
258 .add_request_handler(join_channel_buffer)
259 .add_request_handler(leave_channel_buffer)
260 .add_message_handler(update_channel_buffer)
261 .add_request_handler(rejoin_channel_buffers)
262 .add_request_handler(get_channel_members)
263 .add_request_handler(respond_to_channel_invite)
264 .add_request_handler(join_channel)
265 .add_request_handler(join_channel_chat)
266 .add_message_handler(leave_channel_chat)
267 .add_request_handler(send_channel_message)
268 .add_request_handler(get_channel_messages)
269 .add_request_handler(follow)
270 .add_message_handler(unfollow)
271 .add_message_handler(update_followers)
272 .add_message_handler(update_diff_base)
273 .add_request_handler(get_private_user_info);
274
275 Arc::new(server)
276 }
277
278 pub async fn start(&self) -> Result<()> {
279 let server_id = *self.id.lock();
280 let app_state = self.app_state.clone();
281 let peer = self.peer.clone();
282 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
283 let pool = self.connection_pool.clone();
284 let live_kit_client = self.app_state.live_kit_client.clone();
285
286 let span = info_span!("start server");
287 self.executor.spawn_detached(
288 async move {
289 tracing::info!("waiting for cleanup timeout");
290 timeout.await;
291 tracing::info!("cleanup timeout expired, retrieving stale rooms");
292 if let Some((room_ids, channel_ids)) = app_state
293 .db
294 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
295 .await
296 .trace_err()
297 {
298 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
299 tracing::info!(
300 stale_channel_buffer_count = channel_ids.len(),
301 "retrieved stale channel buffers"
302 );
303
304 for channel_id in channel_ids {
305 if let Some(refreshed_channel_buffer) = app_state
306 .db
307 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
308 .await
309 .trace_err()
310 {
311 for connection_id in refreshed_channel_buffer.connection_ids {
312 for message in &refreshed_channel_buffer.removed_collaborators {
313 peer.send(connection_id, message.clone()).trace_err();
314 }
315 }
316 }
317 }
318
319 for room_id in room_ids {
320 let mut contacts_to_update = HashSet::default();
321 let mut canceled_calls_to_user_ids = Vec::new();
322 let mut live_kit_room = String::new();
323 let mut delete_live_kit_room = false;
324
325 if let Some(mut refreshed_room) = app_state
326 .db
327 .clear_stale_room_participants(room_id, server_id)
328 .await
329 .trace_err()
330 {
331 tracing::info!(
332 room_id = room_id.0,
333 new_participant_count = refreshed_room.room.participants.len(),
334 "refreshed room"
335 );
336 room_updated(&refreshed_room.room, &peer);
337 if let Some(channel_id) = refreshed_room.channel_id {
338 channel_updated(
339 channel_id,
340 &refreshed_room.room,
341 &refreshed_room.channel_members,
342 &peer,
343 &*pool.lock(),
344 );
345 }
346 contacts_to_update
347 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
348 contacts_to_update
349 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
350 canceled_calls_to_user_ids =
351 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
352 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
353 delete_live_kit_room = refreshed_room.room.participants.is_empty();
354 }
355
356 {
357 let pool = pool.lock();
358 for canceled_user_id in canceled_calls_to_user_ids {
359 for connection_id in pool.user_connection_ids(canceled_user_id) {
360 peer.send(
361 connection_id,
362 proto::CallCanceled {
363 room_id: room_id.to_proto(),
364 },
365 )
366 .trace_err();
367 }
368 }
369 }
370
371 for user_id in contacts_to_update {
372 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
373 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
374 if let Some((busy, contacts)) = busy.zip(contacts) {
375 let pool = pool.lock();
376 let updated_contact = contact_for_user(user_id, false, busy, &pool);
377 for contact in contacts {
378 if let db::Contact::Accepted {
379 user_id: contact_user_id,
380 ..
381 } = contact
382 {
383 for contact_conn_id in
384 pool.user_connection_ids(contact_user_id)
385 {
386 peer.send(
387 contact_conn_id,
388 proto::UpdateContacts {
389 contacts: vec![updated_contact.clone()],
390 remove_contacts: Default::default(),
391 incoming_requests: Default::default(),
392 remove_incoming_requests: Default::default(),
393 outgoing_requests: Default::default(),
394 remove_outgoing_requests: Default::default(),
395 },
396 )
397 .trace_err();
398 }
399 }
400 }
401 }
402 }
403
404 if let Some(live_kit) = live_kit_client.as_ref() {
405 if delete_live_kit_room {
406 live_kit.delete_room(live_kit_room).await.trace_err();
407 }
408 }
409 }
410 }
411
412 app_state
413 .db
414 .delete_stale_servers(&app_state.config.zed_environment, server_id)
415 .await
416 .trace_err();
417 }
418 .instrument(span),
419 );
420 Ok(())
421 }
422
423 pub fn teardown(&self) {
424 self.peer.teardown();
425 self.connection_pool.lock().reset();
426 let _ = self.teardown.send(());
427 }
428
429 #[cfg(test)]
430 pub fn reset(&self, id: ServerId) {
431 self.teardown();
432 *self.id.lock() = id;
433 self.peer.reset(id.0 as u32);
434 }
435
436 #[cfg(test)]
437 pub fn id(&self) -> ServerId {
438 *self.id.lock()
439 }
440
441 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
442 where
443 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
444 Fut: 'static + Send + Future<Output = Result<()>>,
445 M: EnvelopedMessage,
446 {
447 let prev_handler = self.handlers.insert(
448 TypeId::of::<M>(),
449 Box::new(move |envelope, session| {
450 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
451 let span = info_span!(
452 "handle message",
453 payload_type = envelope.payload_type_name()
454 );
455 span.in_scope(|| {
456 tracing::info!(
457 payload_type = envelope.payload_type_name(),
458 "message received"
459 );
460 });
461 let start_time = Instant::now();
462 let future = (handler)(*envelope, session);
463 async move {
464 let result = future.await;
465 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
466 match result {
467 Err(error) => {
468 tracing::error!(%error, ?duration_ms, "error handling message")
469 }
470 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
471 }
472 }
473 .instrument(span)
474 .boxed()
475 }),
476 );
477 if prev_handler.is_some() {
478 panic!("registered a handler for the same message twice");
479 }
480 self
481 }
482
483 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
484 where
485 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
486 Fut: 'static + Send + Future<Output = Result<()>>,
487 M: EnvelopedMessage,
488 {
489 self.add_handler(move |envelope, session| handler(envelope.payload, session));
490 self
491 }
492
493 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
494 where
495 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
496 Fut: Send + Future<Output = Result<()>>,
497 M: RequestMessage,
498 {
499 let handler = Arc::new(handler);
500 self.add_handler(move |envelope, session| {
501 let receipt = envelope.receipt();
502 let handler = handler.clone();
503 async move {
504 let peer = session.peer.clone();
505 let responded = Arc::new(AtomicBool::default());
506 let response = Response {
507 peer: peer.clone(),
508 responded: responded.clone(),
509 receipt,
510 };
511 match (handler)(envelope.payload, response, session).await {
512 Ok(()) => {
513 if responded.load(std::sync::atomic::Ordering::SeqCst) {
514 Ok(())
515 } else {
516 Err(anyhow!("handler did not send a response"))?
517 }
518 }
519 Err(error) => {
520 peer.respond_with_error(
521 receipt,
522 proto::Error {
523 message: error.to_string(),
524 },
525 )?;
526 Err(error)
527 }
528 }
529 }
530 })
531 }
532
533 pub fn handle_connection(
534 self: &Arc<Self>,
535 connection: Connection,
536 address: String,
537 user: User,
538 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
539 executor: Executor,
540 ) -> impl Future<Output = Result<()>> {
541 let this = self.clone();
542 let user_id = user.id;
543 let login = user.github_login;
544 let span = info_span!("handle connection", %user_id, %login, %address);
545 let mut teardown = self.teardown.subscribe();
546 async move {
547 let (connection_id, handle_io, mut incoming_rx) = this
548 .peer
549 .add_connection(connection, {
550 let executor = executor.clone();
551 move |duration| executor.sleep(duration)
552 });
553
554 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
555 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
556 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
557
558 if let Some(send_connection_id) = send_connection_id.take() {
559 let _ = send_connection_id.send(connection_id);
560 }
561
562 if !user.connected_once {
563 this.peer.send(connection_id, proto::ShowContacts {})?;
564 this.app_state.db.set_user_connected_once(user_id, true).await?;
565 }
566
567 let (contacts, invite_code, channels_for_user, channel_invites) = future::try_join4(
568 this.app_state.db.get_contacts(user_id),
569 this.app_state.db.get_invite_code_for_user(user_id),
570 this.app_state.db.get_channels_for_user(user_id),
571 this.app_state.db.get_channel_invites_for_user(user_id)
572 ).await?;
573
574 {
575 let mut pool = this.connection_pool.lock();
576 pool.add_connection(connection_id, user_id, user.admin);
577 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
578 this.peer.send(connection_id, build_initial_channels_update(
579 channels_for_user,
580 channel_invites
581 ))?;
582
583 if let Some((code, count)) = invite_code {
584 this.peer.send(connection_id, proto::UpdateInviteInfo {
585 url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
586 count: count as u32,
587 })?;
588 }
589 }
590
591 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
592 this.peer.send(connection_id, incoming_call)?;
593 }
594
595 let session = Session {
596 user_id,
597 connection_id,
598 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
599 peer: this.peer.clone(),
600 connection_pool: this.connection_pool.clone(),
601 live_kit_client: this.app_state.live_kit_client.clone(),
602 executor: executor.clone(),
603 };
604 update_user_contacts(user_id, &session).await?;
605
606 let handle_io = handle_io.fuse();
607 futures::pin_mut!(handle_io);
608
609 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
610 // This prevents deadlocks when e.g., client A performs a request to client B and
611 // client B performs a request to client A. If both clients stop processing further
612 // messages until their respective request completes, they won't have a chance to
613 // respond to the other client's request and cause a deadlock.
614 //
615 // This arrangement ensures we will attempt to process earlier messages first, but fall
616 // back to processing messages arrived later in the spirit of making progress.
617 let mut foreground_message_handlers = FuturesUnordered::new();
618 let concurrent_handlers = Arc::new(Semaphore::new(256));
619 loop {
620 let next_message = async {
621 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
622 let message = incoming_rx.next().await;
623 (permit, message)
624 }.fuse();
625 futures::pin_mut!(next_message);
626 futures::select_biased! {
627 _ = teardown.changed().fuse() => return Ok(()),
628 result = handle_io => {
629 if let Err(error) = result {
630 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
631 }
632 break;
633 }
634 _ = foreground_message_handlers.next() => {}
635 next_message = next_message => {
636 let (permit, message) = next_message;
637 if let Some(message) = message {
638 let type_name = message.payload_type_name();
639 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
640 let span_enter = span.enter();
641 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
642 let is_background = message.is_background();
643 let handle_message = (handler)(message, session.clone());
644 drop(span_enter);
645
646 let handle_message = async move {
647 handle_message.await;
648 drop(permit);
649 }.instrument(span);
650 if is_background {
651 executor.spawn_detached(handle_message);
652 } else {
653 foreground_message_handlers.push(handle_message);
654 }
655 } else {
656 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
657 }
658 } else {
659 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
660 break;
661 }
662 }
663 }
664 }
665
666 drop(foreground_message_handlers);
667 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
668 if let Err(error) = connection_lost(session, teardown, executor).await {
669 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
670 }
671
672 Ok(())
673 }.instrument(span)
674 }
675
676 pub async fn invite_code_redeemed(
677 self: &Arc<Self>,
678 inviter_id: UserId,
679 invitee_id: UserId,
680 ) -> Result<()> {
681 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
682 if let Some(code) = &user.invite_code {
683 let pool = self.connection_pool.lock();
684 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
685 for connection_id in pool.user_connection_ids(inviter_id) {
686 self.peer.send(
687 connection_id,
688 proto::UpdateContacts {
689 contacts: vec![invitee_contact.clone()],
690 ..Default::default()
691 },
692 )?;
693 self.peer.send(
694 connection_id,
695 proto::UpdateInviteInfo {
696 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
697 count: user.invite_count as u32,
698 },
699 )?;
700 }
701 }
702 }
703 Ok(())
704 }
705
706 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
707 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
708 if let Some(invite_code) = &user.invite_code {
709 let pool = self.connection_pool.lock();
710 for connection_id in pool.user_connection_ids(user_id) {
711 self.peer.send(
712 connection_id,
713 proto::UpdateInviteInfo {
714 url: format!(
715 "{}{}",
716 self.app_state.config.invite_link_prefix, invite_code
717 ),
718 count: user.invite_count as u32,
719 },
720 )?;
721 }
722 }
723 }
724 Ok(())
725 }
726
727 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
728 ServerSnapshot {
729 connection_pool: ConnectionPoolGuard {
730 guard: self.connection_pool.lock(),
731 _not_send: PhantomData,
732 },
733 peer: &self.peer,
734 }
735 }
736}
737
738impl<'a> Deref for ConnectionPoolGuard<'a> {
739 type Target = ConnectionPool;
740
741 fn deref(&self) -> &Self::Target {
742 &*self.guard
743 }
744}
745
746impl<'a> DerefMut for ConnectionPoolGuard<'a> {
747 fn deref_mut(&mut self) -> &mut Self::Target {
748 &mut *self.guard
749 }
750}
751
752impl<'a> Drop for ConnectionPoolGuard<'a> {
753 fn drop(&mut self) {
754 #[cfg(test)]
755 self.check_invariants();
756 }
757}
758
759fn broadcast<F>(
760 sender_id: Option<ConnectionId>,
761 receiver_ids: impl IntoIterator<Item = ConnectionId>,
762 mut f: F,
763) where
764 F: FnMut(ConnectionId) -> anyhow::Result<()>,
765{
766 for receiver_id in receiver_ids {
767 if Some(receiver_id) != sender_id {
768 if let Err(error) = f(receiver_id) {
769 tracing::error!("failed to send to {:?} {}", receiver_id, error);
770 }
771 }
772 }
773}
774
775lazy_static! {
776 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
777}
778
779pub struct ProtocolVersion(u32);
780
781impl Header for ProtocolVersion {
782 fn name() -> &'static HeaderName {
783 &ZED_PROTOCOL_VERSION
784 }
785
786 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
787 where
788 Self: Sized,
789 I: Iterator<Item = &'i axum::http::HeaderValue>,
790 {
791 let version = values
792 .next()
793 .ok_or_else(axum::headers::Error::invalid)?
794 .to_str()
795 .map_err(|_| axum::headers::Error::invalid())?
796 .parse()
797 .map_err(|_| axum::headers::Error::invalid())?;
798 Ok(Self(version))
799 }
800
801 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
802 values.extend([self.0.to_string().parse().unwrap()]);
803 }
804}
805
806pub fn routes(server: Arc<Server>) -> Router<Body> {
807 Router::new()
808 .route("/rpc", get(handle_websocket_request))
809 .layer(
810 ServiceBuilder::new()
811 .layer(Extension(server.app_state.clone()))
812 .layer(middleware::from_fn(auth::validate_header)),
813 )
814 .route("/metrics", get(handle_metrics))
815 .layer(Extension(server))
816}
817
818pub async fn handle_websocket_request(
819 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
820 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
821 Extension(server): Extension<Arc<Server>>,
822 Extension(user): Extension<User>,
823 ws: WebSocketUpgrade,
824) -> axum::response::Response {
825 if protocol_version != rpc::PROTOCOL_VERSION {
826 return (
827 StatusCode::UPGRADE_REQUIRED,
828 "client must be upgraded".to_string(),
829 )
830 .into_response();
831 }
832 let socket_address = socket_address.to_string();
833 ws.on_upgrade(move |socket| {
834 use util::ResultExt;
835 let socket = socket
836 .map_ok(to_tungstenite_message)
837 .err_into()
838 .with(|message| async move { Ok(to_axum_message(message)) });
839 let connection = Connection::new(Box::pin(socket));
840 async move {
841 server
842 .handle_connection(connection, socket_address, user, None, Executor::Production)
843 .await
844 .log_err();
845 }
846 })
847}
848
849pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
850 let connections = server
851 .connection_pool
852 .lock()
853 .connections()
854 .filter(|connection| !connection.admin)
855 .count();
856
857 METRIC_CONNECTIONS.set(connections as _);
858
859 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
860 METRIC_SHARED_PROJECTS.set(shared_projects as _);
861
862 let encoder = prometheus::TextEncoder::new();
863 let metric_families = prometheus::gather();
864 let encoded_metrics = encoder
865 .encode_to_string(&metric_families)
866 .map_err(|err| anyhow!("{}", err))?;
867 Ok(encoded_metrics)
868}
869
870#[instrument(err, skip(executor))]
871async fn connection_lost(
872 session: Session,
873 mut teardown: watch::Receiver<()>,
874 executor: Executor,
875) -> Result<()> {
876 session.peer.disconnect(session.connection_id);
877 session
878 .connection_pool()
879 .await
880 .remove_connection(session.connection_id)?;
881
882 session
883 .db()
884 .await
885 .connection_lost(session.connection_id)
886 .await
887 .trace_err();
888
889 futures::select_biased! {
890 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
891 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
892 leave_room_for_session(&session).await.trace_err();
893 leave_channel_buffers_for_session(&session)
894 .await
895 .trace_err();
896
897 if !session
898 .connection_pool()
899 .await
900 .is_user_online(session.user_id)
901 {
902 let db = session.db().await;
903 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
904 room_updated(&room, &session.peer);
905 }
906 }
907 update_user_contacts(session.user_id, &session).await?;
908
909
910 }
911 _ = teardown.changed().fuse() => {}
912 }
913
914 Ok(())
915}
916
917async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
918 response.send(proto::Ack {})?;
919 Ok(())
920}
921
922async fn create_room(
923 _request: proto::CreateRoom,
924 response: Response<proto::CreateRoom>,
925 session: Session,
926) -> Result<()> {
927 let live_kit_room = nanoid::nanoid!(30);
928
929 let live_kit_connection_info = {
930 let live_kit_room = live_kit_room.clone();
931 let live_kit = session.live_kit_client.as_ref();
932
933 util::async_iife!({
934 let live_kit = live_kit?;
935
936 live_kit
937 .create_room(live_kit_room.clone())
938 .await
939 .trace_err()?;
940
941 let token = live_kit
942 .room_token(&live_kit_room, &session.user_id.to_string())
943 .trace_err()?;
944
945 Some(proto::LiveKitConnectionInfo {
946 server_url: live_kit.url().into(),
947 token,
948 })
949 })
950 }
951 .await;
952
953 let room = session
954 .db()
955 .await
956 .create_room(session.user_id, session.connection_id, &live_kit_room)
957 .await?;
958
959 response.send(proto::CreateRoomResponse {
960 room: Some(room.clone()),
961 live_kit_connection_info,
962 })?;
963
964 update_user_contacts(session.user_id, &session).await?;
965 Ok(())
966}
967
968async fn join_room(
969 request: proto::JoinRoom,
970 response: Response<proto::JoinRoom>,
971 session: Session,
972) -> Result<()> {
973 let room_id = RoomId::from_proto(request.id);
974 let joined_room = {
975 let room = session
976 .db()
977 .await
978 .join_room(room_id, session.user_id, session.connection_id)
979 .await?;
980 room_updated(&room.room, &session.peer);
981 room.into_inner()
982 };
983
984 if let Some(channel_id) = joined_room.channel_id {
985 channel_updated(
986 channel_id,
987 &joined_room.room,
988 &joined_room.channel_members,
989 &session.peer,
990 &*session.connection_pool().await,
991 )
992 }
993
994 for connection_id in session
995 .connection_pool()
996 .await
997 .user_connection_ids(session.user_id)
998 {
999 session
1000 .peer
1001 .send(
1002 connection_id,
1003 proto::CallCanceled {
1004 room_id: room_id.to_proto(),
1005 },
1006 )
1007 .trace_err();
1008 }
1009
1010 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1011 if let Some(token) = live_kit
1012 .room_token(
1013 &joined_room.room.live_kit_room,
1014 &session.user_id.to_string(),
1015 )
1016 .trace_err()
1017 {
1018 Some(proto::LiveKitConnectionInfo {
1019 server_url: live_kit.url().into(),
1020 token,
1021 })
1022 } else {
1023 None
1024 }
1025 } else {
1026 None
1027 };
1028
1029 response.send(proto::JoinRoomResponse {
1030 room: Some(joined_room.room),
1031 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1032 live_kit_connection_info,
1033 })?;
1034
1035 update_user_contacts(session.user_id, &session).await?;
1036 Ok(())
1037}
1038
1039async fn rejoin_room(
1040 request: proto::RejoinRoom,
1041 response: Response<proto::RejoinRoom>,
1042 session: Session,
1043) -> Result<()> {
1044 let room;
1045 let channel_id;
1046 let channel_members;
1047 {
1048 let mut rejoined_room = session
1049 .db()
1050 .await
1051 .rejoin_room(request, session.user_id, session.connection_id)
1052 .await?;
1053
1054 response.send(proto::RejoinRoomResponse {
1055 room: Some(rejoined_room.room.clone()),
1056 reshared_projects: rejoined_room
1057 .reshared_projects
1058 .iter()
1059 .map(|project| proto::ResharedProject {
1060 id: project.id.to_proto(),
1061 collaborators: project
1062 .collaborators
1063 .iter()
1064 .map(|collaborator| collaborator.to_proto())
1065 .collect(),
1066 })
1067 .collect(),
1068 rejoined_projects: rejoined_room
1069 .rejoined_projects
1070 .iter()
1071 .map(|rejoined_project| proto::RejoinedProject {
1072 id: rejoined_project.id.to_proto(),
1073 worktrees: rejoined_project
1074 .worktrees
1075 .iter()
1076 .map(|worktree| proto::WorktreeMetadata {
1077 id: worktree.id,
1078 root_name: worktree.root_name.clone(),
1079 visible: worktree.visible,
1080 abs_path: worktree.abs_path.clone(),
1081 })
1082 .collect(),
1083 collaborators: rejoined_project
1084 .collaborators
1085 .iter()
1086 .map(|collaborator| collaborator.to_proto())
1087 .collect(),
1088 language_servers: rejoined_project.language_servers.clone(),
1089 })
1090 .collect(),
1091 })?;
1092 room_updated(&rejoined_room.room, &session.peer);
1093
1094 for project in &rejoined_room.reshared_projects {
1095 for collaborator in &project.collaborators {
1096 session
1097 .peer
1098 .send(
1099 collaborator.connection_id,
1100 proto::UpdateProjectCollaborator {
1101 project_id: project.id.to_proto(),
1102 old_peer_id: Some(project.old_connection_id.into()),
1103 new_peer_id: Some(session.connection_id.into()),
1104 },
1105 )
1106 .trace_err();
1107 }
1108
1109 broadcast(
1110 Some(session.connection_id),
1111 project
1112 .collaborators
1113 .iter()
1114 .map(|collaborator| collaborator.connection_id),
1115 |connection_id| {
1116 session.peer.forward_send(
1117 session.connection_id,
1118 connection_id,
1119 proto::UpdateProject {
1120 project_id: project.id.to_proto(),
1121 worktrees: project.worktrees.clone(),
1122 },
1123 )
1124 },
1125 );
1126 }
1127
1128 for project in &rejoined_room.rejoined_projects {
1129 for collaborator in &project.collaborators {
1130 session
1131 .peer
1132 .send(
1133 collaborator.connection_id,
1134 proto::UpdateProjectCollaborator {
1135 project_id: project.id.to_proto(),
1136 old_peer_id: Some(project.old_connection_id.into()),
1137 new_peer_id: Some(session.connection_id.into()),
1138 },
1139 )
1140 .trace_err();
1141 }
1142 }
1143
1144 for project in &mut rejoined_room.rejoined_projects {
1145 for worktree in mem::take(&mut project.worktrees) {
1146 #[cfg(any(test, feature = "test-support"))]
1147 const MAX_CHUNK_SIZE: usize = 2;
1148 #[cfg(not(any(test, feature = "test-support")))]
1149 const MAX_CHUNK_SIZE: usize = 256;
1150
1151 // Stream this worktree's entries.
1152 let message = proto::UpdateWorktree {
1153 project_id: project.id.to_proto(),
1154 worktree_id: worktree.id,
1155 abs_path: worktree.abs_path.clone(),
1156 root_name: worktree.root_name,
1157 updated_entries: worktree.updated_entries,
1158 removed_entries: worktree.removed_entries,
1159 scan_id: worktree.scan_id,
1160 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1161 updated_repositories: worktree.updated_repositories,
1162 removed_repositories: worktree.removed_repositories,
1163 };
1164 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1165 session.peer.send(session.connection_id, update.clone())?;
1166 }
1167
1168 // Stream this worktree's diagnostics.
1169 for summary in worktree.diagnostic_summaries {
1170 session.peer.send(
1171 session.connection_id,
1172 proto::UpdateDiagnosticSummary {
1173 project_id: project.id.to_proto(),
1174 worktree_id: worktree.id,
1175 summary: Some(summary),
1176 },
1177 )?;
1178 }
1179
1180 for settings_file in worktree.settings_files {
1181 session.peer.send(
1182 session.connection_id,
1183 proto::UpdateWorktreeSettings {
1184 project_id: project.id.to_proto(),
1185 worktree_id: worktree.id,
1186 path: settings_file.path,
1187 content: Some(settings_file.content),
1188 },
1189 )?;
1190 }
1191 }
1192
1193 for language_server in &project.language_servers {
1194 session.peer.send(
1195 session.connection_id,
1196 proto::UpdateLanguageServer {
1197 project_id: project.id.to_proto(),
1198 language_server_id: language_server.id,
1199 variant: Some(
1200 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1201 proto::LspDiskBasedDiagnosticsUpdated {},
1202 ),
1203 ),
1204 },
1205 )?;
1206 }
1207 }
1208
1209 let rejoined_room = rejoined_room.into_inner();
1210
1211 room = rejoined_room.room;
1212 channel_id = rejoined_room.channel_id;
1213 channel_members = rejoined_room.channel_members;
1214 }
1215
1216 if let Some(channel_id) = channel_id {
1217 channel_updated(
1218 channel_id,
1219 &room,
1220 &channel_members,
1221 &session.peer,
1222 &*session.connection_pool().await,
1223 );
1224 }
1225
1226 update_user_contacts(session.user_id, &session).await?;
1227 Ok(())
1228}
1229
1230async fn leave_room(
1231 _: proto::LeaveRoom,
1232 response: Response<proto::LeaveRoom>,
1233 session: Session,
1234) -> Result<()> {
1235 leave_room_for_session(&session).await?;
1236 response.send(proto::Ack {})?;
1237 Ok(())
1238}
1239
1240async fn call(
1241 request: proto::Call,
1242 response: Response<proto::Call>,
1243 session: Session,
1244) -> Result<()> {
1245 let room_id = RoomId::from_proto(request.room_id);
1246 let calling_user_id = session.user_id;
1247 let calling_connection_id = session.connection_id;
1248 let called_user_id = UserId::from_proto(request.called_user_id);
1249 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1250 if !session
1251 .db()
1252 .await
1253 .has_contact(calling_user_id, called_user_id)
1254 .await?
1255 {
1256 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1257 }
1258
1259 let incoming_call = {
1260 let (room, incoming_call) = &mut *session
1261 .db()
1262 .await
1263 .call(
1264 room_id,
1265 calling_user_id,
1266 calling_connection_id,
1267 called_user_id,
1268 initial_project_id,
1269 )
1270 .await?;
1271 room_updated(&room, &session.peer);
1272 mem::take(incoming_call)
1273 };
1274 update_user_contacts(called_user_id, &session).await?;
1275
1276 let mut calls = session
1277 .connection_pool()
1278 .await
1279 .user_connection_ids(called_user_id)
1280 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1281 .collect::<FuturesUnordered<_>>();
1282
1283 while let Some(call_response) = calls.next().await {
1284 match call_response.as_ref() {
1285 Ok(_) => {
1286 response.send(proto::Ack {})?;
1287 return Ok(());
1288 }
1289 Err(_) => {
1290 call_response.trace_err();
1291 }
1292 }
1293 }
1294
1295 {
1296 let room = session
1297 .db()
1298 .await
1299 .call_failed(room_id, called_user_id)
1300 .await?;
1301 room_updated(&room, &session.peer);
1302 }
1303 update_user_contacts(called_user_id, &session).await?;
1304
1305 Err(anyhow!("failed to ring user"))?
1306}
1307
1308async fn cancel_call(
1309 request: proto::CancelCall,
1310 response: Response<proto::CancelCall>,
1311 session: Session,
1312) -> Result<()> {
1313 let called_user_id = UserId::from_proto(request.called_user_id);
1314 let room_id = RoomId::from_proto(request.room_id);
1315 {
1316 let room = session
1317 .db()
1318 .await
1319 .cancel_call(room_id, session.connection_id, called_user_id)
1320 .await?;
1321 room_updated(&room, &session.peer);
1322 }
1323
1324 for connection_id in session
1325 .connection_pool()
1326 .await
1327 .user_connection_ids(called_user_id)
1328 {
1329 session
1330 .peer
1331 .send(
1332 connection_id,
1333 proto::CallCanceled {
1334 room_id: room_id.to_proto(),
1335 },
1336 )
1337 .trace_err();
1338 }
1339 response.send(proto::Ack {})?;
1340
1341 update_user_contacts(called_user_id, &session).await?;
1342 Ok(())
1343}
1344
1345async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1346 let room_id = RoomId::from_proto(message.room_id);
1347 {
1348 let room = session
1349 .db()
1350 .await
1351 .decline_call(Some(room_id), session.user_id)
1352 .await?
1353 .ok_or_else(|| anyhow!("failed to decline call"))?;
1354 room_updated(&room, &session.peer);
1355 }
1356
1357 for connection_id in session
1358 .connection_pool()
1359 .await
1360 .user_connection_ids(session.user_id)
1361 {
1362 session
1363 .peer
1364 .send(
1365 connection_id,
1366 proto::CallCanceled {
1367 room_id: room_id.to_proto(),
1368 },
1369 )
1370 .trace_err();
1371 }
1372 update_user_contacts(session.user_id, &session).await?;
1373 Ok(())
1374}
1375
1376async fn update_participant_location(
1377 request: proto::UpdateParticipantLocation,
1378 response: Response<proto::UpdateParticipantLocation>,
1379 session: Session,
1380) -> Result<()> {
1381 let room_id = RoomId::from_proto(request.room_id);
1382 let location = request
1383 .location
1384 .ok_or_else(|| anyhow!("invalid location"))?;
1385
1386 let db = session.db().await;
1387 let room = db
1388 .update_room_participant_location(room_id, session.connection_id, location)
1389 .await?;
1390
1391 room_updated(&room, &session.peer);
1392 response.send(proto::Ack {})?;
1393 Ok(())
1394}
1395
1396async fn share_project(
1397 request: proto::ShareProject,
1398 response: Response<proto::ShareProject>,
1399 session: Session,
1400) -> Result<()> {
1401 let (project_id, room) = &*session
1402 .db()
1403 .await
1404 .share_project(
1405 RoomId::from_proto(request.room_id),
1406 session.connection_id,
1407 &request.worktrees,
1408 )
1409 .await?;
1410 response.send(proto::ShareProjectResponse {
1411 project_id: project_id.to_proto(),
1412 })?;
1413 room_updated(&room, &session.peer);
1414
1415 Ok(())
1416}
1417
1418async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1419 let project_id = ProjectId::from_proto(message.project_id);
1420
1421 let (room, guest_connection_ids) = &*session
1422 .db()
1423 .await
1424 .unshare_project(project_id, session.connection_id)
1425 .await?;
1426
1427 broadcast(
1428 Some(session.connection_id),
1429 guest_connection_ids.iter().copied(),
1430 |conn_id| session.peer.send(conn_id, message.clone()),
1431 );
1432 room_updated(&room, &session.peer);
1433
1434 Ok(())
1435}
1436
1437async fn join_project(
1438 request: proto::JoinProject,
1439 response: Response<proto::JoinProject>,
1440 session: Session,
1441) -> Result<()> {
1442 let project_id = ProjectId::from_proto(request.project_id);
1443 let guest_user_id = session.user_id;
1444
1445 tracing::info!(%project_id, "join project");
1446
1447 let (project, replica_id) = &mut *session
1448 .db()
1449 .await
1450 .join_project(project_id, session.connection_id)
1451 .await?;
1452
1453 let collaborators = project
1454 .collaborators
1455 .iter()
1456 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1457 .map(|collaborator| collaborator.to_proto())
1458 .collect::<Vec<_>>();
1459
1460 let worktrees = project
1461 .worktrees
1462 .iter()
1463 .map(|(id, worktree)| proto::WorktreeMetadata {
1464 id: *id,
1465 root_name: worktree.root_name.clone(),
1466 visible: worktree.visible,
1467 abs_path: worktree.abs_path.clone(),
1468 })
1469 .collect::<Vec<_>>();
1470
1471 for collaborator in &collaborators {
1472 session
1473 .peer
1474 .send(
1475 collaborator.peer_id.unwrap().into(),
1476 proto::AddProjectCollaborator {
1477 project_id: project_id.to_proto(),
1478 collaborator: Some(proto::Collaborator {
1479 peer_id: Some(session.connection_id.into()),
1480 replica_id: replica_id.0 as u32,
1481 user_id: guest_user_id.to_proto(),
1482 }),
1483 },
1484 )
1485 .trace_err();
1486 }
1487
1488 // First, we send the metadata associated with each worktree.
1489 response.send(proto::JoinProjectResponse {
1490 worktrees: worktrees.clone(),
1491 replica_id: replica_id.0 as u32,
1492 collaborators: collaborators.clone(),
1493 language_servers: project.language_servers.clone(),
1494 })?;
1495
1496 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1497 #[cfg(any(test, feature = "test-support"))]
1498 const MAX_CHUNK_SIZE: usize = 2;
1499 #[cfg(not(any(test, feature = "test-support")))]
1500 const MAX_CHUNK_SIZE: usize = 256;
1501
1502 // Stream this worktree's entries.
1503 let message = proto::UpdateWorktree {
1504 project_id: project_id.to_proto(),
1505 worktree_id,
1506 abs_path: worktree.abs_path.clone(),
1507 root_name: worktree.root_name,
1508 updated_entries: worktree.entries,
1509 removed_entries: Default::default(),
1510 scan_id: worktree.scan_id,
1511 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1512 updated_repositories: worktree.repository_entries.into_values().collect(),
1513 removed_repositories: Default::default(),
1514 };
1515 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1516 session.peer.send(session.connection_id, update.clone())?;
1517 }
1518
1519 // Stream this worktree's diagnostics.
1520 for summary in worktree.diagnostic_summaries {
1521 session.peer.send(
1522 session.connection_id,
1523 proto::UpdateDiagnosticSummary {
1524 project_id: project_id.to_proto(),
1525 worktree_id: worktree.id,
1526 summary: Some(summary),
1527 },
1528 )?;
1529 }
1530
1531 for settings_file in worktree.settings_files {
1532 session.peer.send(
1533 session.connection_id,
1534 proto::UpdateWorktreeSettings {
1535 project_id: project_id.to_proto(),
1536 worktree_id: worktree.id,
1537 path: settings_file.path,
1538 content: Some(settings_file.content),
1539 },
1540 )?;
1541 }
1542 }
1543
1544 for language_server in &project.language_servers {
1545 session.peer.send(
1546 session.connection_id,
1547 proto::UpdateLanguageServer {
1548 project_id: project_id.to_proto(),
1549 language_server_id: language_server.id,
1550 variant: Some(
1551 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1552 proto::LspDiskBasedDiagnosticsUpdated {},
1553 ),
1554 ),
1555 },
1556 )?;
1557 }
1558
1559 Ok(())
1560}
1561
1562async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1563 let sender_id = session.connection_id;
1564 let project_id = ProjectId::from_proto(request.project_id);
1565
1566 let (room, project) = &*session
1567 .db()
1568 .await
1569 .leave_project(project_id, sender_id)
1570 .await?;
1571 tracing::info!(
1572 %project_id,
1573 host_user_id = %project.host_user_id,
1574 host_connection_id = %project.host_connection_id,
1575 "leave project"
1576 );
1577
1578 project_left(&project, &session);
1579 room_updated(&room, &session.peer);
1580
1581 Ok(())
1582}
1583
1584async fn update_project(
1585 request: proto::UpdateProject,
1586 response: Response<proto::UpdateProject>,
1587 session: Session,
1588) -> Result<()> {
1589 let project_id = ProjectId::from_proto(request.project_id);
1590 let (room, guest_connection_ids) = &*session
1591 .db()
1592 .await
1593 .update_project(project_id, session.connection_id, &request.worktrees)
1594 .await?;
1595 broadcast(
1596 Some(session.connection_id),
1597 guest_connection_ids.iter().copied(),
1598 |connection_id| {
1599 session
1600 .peer
1601 .forward_send(session.connection_id, connection_id, request.clone())
1602 },
1603 );
1604 room_updated(&room, &session.peer);
1605 response.send(proto::Ack {})?;
1606
1607 Ok(())
1608}
1609
1610async fn update_worktree(
1611 request: proto::UpdateWorktree,
1612 response: Response<proto::UpdateWorktree>,
1613 session: Session,
1614) -> Result<()> {
1615 let guest_connection_ids = session
1616 .db()
1617 .await
1618 .update_worktree(&request, session.connection_id)
1619 .await?;
1620
1621 broadcast(
1622 Some(session.connection_id),
1623 guest_connection_ids.iter().copied(),
1624 |connection_id| {
1625 session
1626 .peer
1627 .forward_send(session.connection_id, connection_id, request.clone())
1628 },
1629 );
1630 response.send(proto::Ack {})?;
1631 Ok(())
1632}
1633
1634async fn update_diagnostic_summary(
1635 message: proto::UpdateDiagnosticSummary,
1636 session: Session,
1637) -> Result<()> {
1638 let guest_connection_ids = session
1639 .db()
1640 .await
1641 .update_diagnostic_summary(&message, session.connection_id)
1642 .await?;
1643
1644 broadcast(
1645 Some(session.connection_id),
1646 guest_connection_ids.iter().copied(),
1647 |connection_id| {
1648 session
1649 .peer
1650 .forward_send(session.connection_id, connection_id, message.clone())
1651 },
1652 );
1653
1654 Ok(())
1655}
1656
1657async fn update_worktree_settings(
1658 message: proto::UpdateWorktreeSettings,
1659 session: Session,
1660) -> Result<()> {
1661 let guest_connection_ids = session
1662 .db()
1663 .await
1664 .update_worktree_settings(&message, session.connection_id)
1665 .await?;
1666
1667 broadcast(
1668 Some(session.connection_id),
1669 guest_connection_ids.iter().copied(),
1670 |connection_id| {
1671 session
1672 .peer
1673 .forward_send(session.connection_id, connection_id, message.clone())
1674 },
1675 );
1676
1677 Ok(())
1678}
1679
1680async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1681 broadcast_project_message(request.project_id, request, session).await
1682}
1683
1684async fn start_language_server(
1685 request: proto::StartLanguageServer,
1686 session: Session,
1687) -> Result<()> {
1688 let guest_connection_ids = session
1689 .db()
1690 .await
1691 .start_language_server(&request, session.connection_id)
1692 .await?;
1693
1694 broadcast(
1695 Some(session.connection_id),
1696 guest_connection_ids.iter().copied(),
1697 |connection_id| {
1698 session
1699 .peer
1700 .forward_send(session.connection_id, connection_id, request.clone())
1701 },
1702 );
1703 Ok(())
1704}
1705
1706async fn update_language_server(
1707 request: proto::UpdateLanguageServer,
1708 session: Session,
1709) -> Result<()> {
1710 session.executor.record_backtrace();
1711 let project_id = ProjectId::from_proto(request.project_id);
1712 let project_connection_ids = session
1713 .db()
1714 .await
1715 .project_connection_ids(project_id, session.connection_id)
1716 .await?;
1717 broadcast(
1718 Some(session.connection_id),
1719 project_connection_ids.iter().copied(),
1720 |connection_id| {
1721 session
1722 .peer
1723 .forward_send(session.connection_id, connection_id, request.clone())
1724 },
1725 );
1726 Ok(())
1727}
1728
1729async fn forward_project_request<T>(
1730 request: T,
1731 response: Response<T>,
1732 session: Session,
1733) -> Result<()>
1734where
1735 T: EntityMessage + RequestMessage,
1736{
1737 session.executor.record_backtrace();
1738 let project_id = ProjectId::from_proto(request.remote_entity_id());
1739 let host_connection_id = {
1740 let collaborators = session
1741 .db()
1742 .await
1743 .project_collaborators(project_id, session.connection_id)
1744 .await?;
1745 collaborators
1746 .iter()
1747 .find(|collaborator| collaborator.is_host)
1748 .ok_or_else(|| anyhow!("host not found"))?
1749 .connection_id
1750 };
1751
1752 let payload = session
1753 .peer
1754 .forward_request(session.connection_id, host_connection_id, request)
1755 .await?;
1756
1757 response.send(payload)?;
1758 Ok(())
1759}
1760
1761async fn create_buffer_for_peer(
1762 request: proto::CreateBufferForPeer,
1763 session: Session,
1764) -> Result<()> {
1765 session.executor.record_backtrace();
1766 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1767 session
1768 .peer
1769 .forward_send(session.connection_id, peer_id.into(), request)?;
1770 Ok(())
1771}
1772
1773async fn update_buffer(
1774 request: proto::UpdateBuffer,
1775 response: Response<proto::UpdateBuffer>,
1776 session: Session,
1777) -> Result<()> {
1778 session.executor.record_backtrace();
1779 let project_id = ProjectId::from_proto(request.project_id);
1780 let mut guest_connection_ids;
1781 let mut host_connection_id = None;
1782 {
1783 let collaborators = session
1784 .db()
1785 .await
1786 .project_collaborators(project_id, session.connection_id)
1787 .await?;
1788 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1789 for collaborator in collaborators.iter() {
1790 if collaborator.is_host {
1791 host_connection_id = Some(collaborator.connection_id);
1792 } else {
1793 guest_connection_ids.push(collaborator.connection_id);
1794 }
1795 }
1796 }
1797 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1798
1799 session.executor.record_backtrace();
1800 broadcast(
1801 Some(session.connection_id),
1802 guest_connection_ids,
1803 |connection_id| {
1804 session
1805 .peer
1806 .forward_send(session.connection_id, connection_id, request.clone())
1807 },
1808 );
1809 if host_connection_id != session.connection_id {
1810 session
1811 .peer
1812 .forward_request(session.connection_id, host_connection_id, request.clone())
1813 .await?;
1814 }
1815
1816 response.send(proto::Ack {})?;
1817 Ok(())
1818}
1819
1820async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1821 let project_id = ProjectId::from_proto(request.project_id);
1822 let project_connection_ids = session
1823 .db()
1824 .await
1825 .project_connection_ids(project_id, session.connection_id)
1826 .await?;
1827
1828 broadcast(
1829 Some(session.connection_id),
1830 project_connection_ids.iter().copied(),
1831 |connection_id| {
1832 session
1833 .peer
1834 .forward_send(session.connection_id, connection_id, request.clone())
1835 },
1836 );
1837 Ok(())
1838}
1839
1840async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1841 let project_id = ProjectId::from_proto(request.project_id);
1842 let project_connection_ids = session
1843 .db()
1844 .await
1845 .project_connection_ids(project_id, session.connection_id)
1846 .await?;
1847 broadcast(
1848 Some(session.connection_id),
1849 project_connection_ids.iter().copied(),
1850 |connection_id| {
1851 session
1852 .peer
1853 .forward_send(session.connection_id, connection_id, request.clone())
1854 },
1855 );
1856 Ok(())
1857}
1858
1859async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1860 broadcast_project_message(request.project_id, request, session).await
1861}
1862
1863async fn broadcast_project_message<T: EnvelopedMessage>(
1864 project_id: u64,
1865 request: T,
1866 session: Session,
1867) -> Result<()> {
1868 let project_id = ProjectId::from_proto(project_id);
1869 let project_connection_ids = session
1870 .db()
1871 .await
1872 .project_connection_ids(project_id, session.connection_id)
1873 .await?;
1874 broadcast(
1875 Some(session.connection_id),
1876 project_connection_ids.iter().copied(),
1877 |connection_id| {
1878 session
1879 .peer
1880 .forward_send(session.connection_id, connection_id, request.clone())
1881 },
1882 );
1883 Ok(())
1884}
1885
1886async fn follow(
1887 request: proto::Follow,
1888 response: Response<proto::Follow>,
1889 session: Session,
1890) -> Result<()> {
1891 let project_id = ProjectId::from_proto(request.project_id);
1892 let leader_id = request
1893 .leader_id
1894 .ok_or_else(|| anyhow!("invalid leader id"))?
1895 .into();
1896 let follower_id = session.connection_id;
1897
1898 {
1899 let project_connection_ids = session
1900 .db()
1901 .await
1902 .project_connection_ids(project_id, session.connection_id)
1903 .await?;
1904
1905 if !project_connection_ids.contains(&leader_id) {
1906 Err(anyhow!("no such peer"))?;
1907 }
1908 }
1909
1910 let mut response_payload = session
1911 .peer
1912 .forward_request(session.connection_id, leader_id, request)
1913 .await?;
1914 response_payload
1915 .views
1916 .retain(|view| view.leader_id != Some(follower_id.into()));
1917 response.send(response_payload)?;
1918
1919 let room = session
1920 .db()
1921 .await
1922 .follow(project_id, leader_id, follower_id)
1923 .await?;
1924 room_updated(&room, &session.peer);
1925
1926 Ok(())
1927}
1928
1929async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1930 let project_id = ProjectId::from_proto(request.project_id);
1931 let leader_id = request
1932 .leader_id
1933 .ok_or_else(|| anyhow!("invalid leader id"))?
1934 .into();
1935 let follower_id = session.connection_id;
1936
1937 if !session
1938 .db()
1939 .await
1940 .project_connection_ids(project_id, session.connection_id)
1941 .await?
1942 .contains(&leader_id)
1943 {
1944 Err(anyhow!("no such peer"))?;
1945 }
1946
1947 session
1948 .peer
1949 .forward_send(session.connection_id, leader_id, request)?;
1950
1951 let room = session
1952 .db()
1953 .await
1954 .unfollow(project_id, leader_id, follower_id)
1955 .await?;
1956 room_updated(&room, &session.peer);
1957
1958 Ok(())
1959}
1960
1961async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1962 let project_id = ProjectId::from_proto(request.project_id);
1963 let project_connection_ids = session
1964 .db
1965 .lock()
1966 .await
1967 .project_connection_ids(project_id, session.connection_id)
1968 .await?;
1969
1970 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1971 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1972 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1973 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1974 });
1975 for follower_peer_id in request.follower_ids.iter().copied() {
1976 let follower_connection_id = follower_peer_id.into();
1977 if project_connection_ids.contains(&follower_connection_id)
1978 && Some(follower_peer_id) != leader_id
1979 {
1980 session.peer.forward_send(
1981 session.connection_id,
1982 follower_connection_id,
1983 request.clone(),
1984 )?;
1985 }
1986 }
1987 Ok(())
1988}
1989
1990async fn get_users(
1991 request: proto::GetUsers,
1992 response: Response<proto::GetUsers>,
1993 session: Session,
1994) -> Result<()> {
1995 let user_ids = request
1996 .user_ids
1997 .into_iter()
1998 .map(UserId::from_proto)
1999 .collect();
2000 let users = session
2001 .db()
2002 .await
2003 .get_users_by_ids(user_ids)
2004 .await?
2005 .into_iter()
2006 .map(|user| proto::User {
2007 id: user.id.to_proto(),
2008 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2009 github_login: user.github_login,
2010 })
2011 .collect();
2012 response.send(proto::UsersResponse { users })?;
2013 Ok(())
2014}
2015
2016async fn fuzzy_search_users(
2017 request: proto::FuzzySearchUsers,
2018 response: Response<proto::FuzzySearchUsers>,
2019 session: Session,
2020) -> Result<()> {
2021 let query = request.query;
2022 let users = match query.len() {
2023 0 => vec![],
2024 1 | 2 => session
2025 .db()
2026 .await
2027 .get_user_by_github_login(&query)
2028 .await?
2029 .into_iter()
2030 .collect(),
2031 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2032 };
2033 let users = users
2034 .into_iter()
2035 .filter(|user| user.id != session.user_id)
2036 .map(|user| proto::User {
2037 id: user.id.to_proto(),
2038 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2039 github_login: user.github_login,
2040 })
2041 .collect();
2042 response.send(proto::UsersResponse { users })?;
2043 Ok(())
2044}
2045
2046async fn request_contact(
2047 request: proto::RequestContact,
2048 response: Response<proto::RequestContact>,
2049 session: Session,
2050) -> Result<()> {
2051 let requester_id = session.user_id;
2052 let responder_id = UserId::from_proto(request.responder_id);
2053 if requester_id == responder_id {
2054 return Err(anyhow!("cannot add yourself as a contact"))?;
2055 }
2056
2057 session
2058 .db()
2059 .await
2060 .send_contact_request(requester_id, responder_id)
2061 .await?;
2062
2063 // Update outgoing contact requests of requester
2064 let mut update = proto::UpdateContacts::default();
2065 update.outgoing_requests.push(responder_id.to_proto());
2066 for connection_id in session
2067 .connection_pool()
2068 .await
2069 .user_connection_ids(requester_id)
2070 {
2071 session.peer.send(connection_id, update.clone())?;
2072 }
2073
2074 // Update incoming contact requests of responder
2075 let mut update = proto::UpdateContacts::default();
2076 update
2077 .incoming_requests
2078 .push(proto::IncomingContactRequest {
2079 requester_id: requester_id.to_proto(),
2080 should_notify: true,
2081 });
2082 for connection_id in session
2083 .connection_pool()
2084 .await
2085 .user_connection_ids(responder_id)
2086 {
2087 session.peer.send(connection_id, update.clone())?;
2088 }
2089
2090 response.send(proto::Ack {})?;
2091 Ok(())
2092}
2093
2094async fn respond_to_contact_request(
2095 request: proto::RespondToContactRequest,
2096 response: Response<proto::RespondToContactRequest>,
2097 session: Session,
2098) -> Result<()> {
2099 let responder_id = session.user_id;
2100 let requester_id = UserId::from_proto(request.requester_id);
2101 let db = session.db().await;
2102 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2103 db.dismiss_contact_notification(responder_id, requester_id)
2104 .await?;
2105 } else {
2106 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2107
2108 db.respond_to_contact_request(responder_id, requester_id, accept)
2109 .await?;
2110 let requester_busy = db.is_user_busy(requester_id).await?;
2111 let responder_busy = db.is_user_busy(responder_id).await?;
2112
2113 let pool = session.connection_pool().await;
2114 // Update responder with new contact
2115 let mut update = proto::UpdateContacts::default();
2116 if accept {
2117 update
2118 .contacts
2119 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2120 }
2121 update
2122 .remove_incoming_requests
2123 .push(requester_id.to_proto());
2124 for connection_id in pool.user_connection_ids(responder_id) {
2125 session.peer.send(connection_id, update.clone())?;
2126 }
2127
2128 // Update requester with new contact
2129 let mut update = proto::UpdateContacts::default();
2130 if accept {
2131 update
2132 .contacts
2133 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2134 }
2135 update
2136 .remove_outgoing_requests
2137 .push(responder_id.to_proto());
2138 for connection_id in pool.user_connection_ids(requester_id) {
2139 session.peer.send(connection_id, update.clone())?;
2140 }
2141 }
2142
2143 response.send(proto::Ack {})?;
2144 Ok(())
2145}
2146
2147async fn remove_contact(
2148 request: proto::RemoveContact,
2149 response: Response<proto::RemoveContact>,
2150 session: Session,
2151) -> Result<()> {
2152 let requester_id = session.user_id;
2153 let responder_id = UserId::from_proto(request.user_id);
2154 let db = session.db().await;
2155 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2156
2157 let pool = session.connection_pool().await;
2158 // Update outgoing contact requests of requester
2159 let mut update = proto::UpdateContacts::default();
2160 if contact_accepted {
2161 update.remove_contacts.push(responder_id.to_proto());
2162 } else {
2163 update
2164 .remove_outgoing_requests
2165 .push(responder_id.to_proto());
2166 }
2167 for connection_id in pool.user_connection_ids(requester_id) {
2168 session.peer.send(connection_id, update.clone())?;
2169 }
2170
2171 // Update incoming contact requests of responder
2172 let mut update = proto::UpdateContacts::default();
2173 if contact_accepted {
2174 update.remove_contacts.push(requester_id.to_proto());
2175 } else {
2176 update
2177 .remove_incoming_requests
2178 .push(requester_id.to_proto());
2179 }
2180 for connection_id in pool.user_connection_ids(responder_id) {
2181 session.peer.send(connection_id, update.clone())?;
2182 }
2183
2184 response.send(proto::Ack {})?;
2185 Ok(())
2186}
2187
2188async fn create_channel(
2189 request: proto::CreateChannel,
2190 response: Response<proto::CreateChannel>,
2191 session: Session,
2192) -> Result<()> {
2193 let db = session.db().await;
2194 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2195
2196 if let Some(live_kit) = session.live_kit_client.as_ref() {
2197 live_kit.create_room(live_kit_room.clone()).await?;
2198 }
2199
2200 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2201 let id = db
2202 .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2203 .await?;
2204
2205 let channel = proto::Channel {
2206 id: id.to_proto(),
2207 name: request.name,
2208 parent_id: request.parent_id,
2209 };
2210
2211 response.send(proto::ChannelResponse {
2212 channel: Some(channel.clone()),
2213 })?;
2214
2215 let mut update = proto::UpdateChannels::default();
2216 update.channels.push(channel);
2217
2218 let user_ids_to_notify = if let Some(parent_id) = parent_id {
2219 db.get_channel_members(parent_id).await?
2220 } else {
2221 vec![session.user_id]
2222 };
2223
2224 let connection_pool = session.connection_pool().await;
2225 for user_id in user_ids_to_notify {
2226 for connection_id in connection_pool.user_connection_ids(user_id) {
2227 let mut update = update.clone();
2228 if user_id == session.user_id {
2229 update.channel_permissions.push(proto::ChannelPermission {
2230 channel_id: id.to_proto(),
2231 is_admin: true,
2232 });
2233 }
2234 session.peer.send(connection_id, update)?;
2235 }
2236 }
2237
2238 Ok(())
2239}
2240
2241async fn remove_channel(
2242 request: proto::RemoveChannel,
2243 response: Response<proto::RemoveChannel>,
2244 session: Session,
2245) -> Result<()> {
2246 let db = session.db().await;
2247
2248 let channel_id = request.channel_id;
2249 let (removed_channels, member_ids) = db
2250 .remove_channel(ChannelId::from_proto(channel_id), session.user_id)
2251 .await?;
2252 response.send(proto::Ack {})?;
2253
2254 // Notify members of removed channels
2255 let mut update = proto::UpdateChannels::default();
2256 update
2257 .remove_channels
2258 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2259
2260 let connection_pool = session.connection_pool().await;
2261 for member_id in member_ids {
2262 for connection_id in connection_pool.user_connection_ids(member_id) {
2263 session.peer.send(connection_id, update.clone())?;
2264 }
2265 }
2266
2267 Ok(())
2268}
2269
2270async fn invite_channel_member(
2271 request: proto::InviteChannelMember,
2272 response: Response<proto::InviteChannelMember>,
2273 session: Session,
2274) -> Result<()> {
2275 let db = session.db().await;
2276 let channel_id = ChannelId::from_proto(request.channel_id);
2277 let invitee_id = UserId::from_proto(request.user_id);
2278 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2279 .await?;
2280
2281 let (channel, _) = db
2282 .get_channel(channel_id, session.user_id)
2283 .await?
2284 .ok_or_else(|| anyhow!("channel not found"))?;
2285
2286 let mut update = proto::UpdateChannels::default();
2287 update.channel_invitations.push(proto::Channel {
2288 id: channel.id.to_proto(),
2289 name: channel.name,
2290 parent_id: None,
2291 });
2292 for connection_id in session
2293 .connection_pool()
2294 .await
2295 .user_connection_ids(invitee_id)
2296 {
2297 session.peer.send(connection_id, update.clone())?;
2298 }
2299
2300 response.send(proto::Ack {})?;
2301 Ok(())
2302}
2303
2304async fn remove_channel_member(
2305 request: proto::RemoveChannelMember,
2306 response: Response<proto::RemoveChannelMember>,
2307 session: Session,
2308) -> Result<()> {
2309 let db = session.db().await;
2310 let channel_id = ChannelId::from_proto(request.channel_id);
2311 let member_id = UserId::from_proto(request.user_id);
2312
2313 db.remove_channel_member(channel_id, member_id, session.user_id)
2314 .await?;
2315
2316 let mut update = proto::UpdateChannels::default();
2317 update.remove_channels.push(channel_id.to_proto());
2318
2319 for connection_id in session
2320 .connection_pool()
2321 .await
2322 .user_connection_ids(member_id)
2323 {
2324 session.peer.send(connection_id, update.clone())?;
2325 }
2326
2327 response.send(proto::Ack {})?;
2328 Ok(())
2329}
2330
2331async fn set_channel_member_admin(
2332 request: proto::SetChannelMemberAdmin,
2333 response: Response<proto::SetChannelMemberAdmin>,
2334 session: Session,
2335) -> Result<()> {
2336 let db = session.db().await;
2337 let channel_id = ChannelId::from_proto(request.channel_id);
2338 let member_id = UserId::from_proto(request.user_id);
2339 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2340 .await?;
2341
2342 let (channel, has_accepted) = db
2343 .get_channel(channel_id, member_id)
2344 .await?
2345 .ok_or_else(|| anyhow!("channel not found"))?;
2346
2347 let mut update = proto::UpdateChannels::default();
2348 if has_accepted {
2349 update.channel_permissions.push(proto::ChannelPermission {
2350 channel_id: channel.id.to_proto(),
2351 is_admin: request.admin,
2352 });
2353 }
2354
2355 for connection_id in session
2356 .connection_pool()
2357 .await
2358 .user_connection_ids(member_id)
2359 {
2360 session.peer.send(connection_id, update.clone())?;
2361 }
2362
2363 response.send(proto::Ack {})?;
2364 Ok(())
2365}
2366
2367async fn rename_channel(
2368 request: proto::RenameChannel,
2369 response: Response<proto::RenameChannel>,
2370 session: Session,
2371) -> Result<()> {
2372 let db = session.db().await;
2373 let channel_id = ChannelId::from_proto(request.channel_id);
2374 let new_name = db
2375 .rename_channel(channel_id, session.user_id, &request.name)
2376 .await?;
2377
2378 let channel = proto::Channel {
2379 id: request.channel_id,
2380 name: new_name,
2381 parent_id: None,
2382 };
2383 response.send(proto::ChannelResponse {
2384 channel: Some(channel.clone()),
2385 })?;
2386 let mut update = proto::UpdateChannels::default();
2387 update.channels.push(channel);
2388
2389 let member_ids = db.get_channel_members(channel_id).await?;
2390
2391 let connection_pool = session.connection_pool().await;
2392 for member_id in member_ids {
2393 for connection_id in connection_pool.user_connection_ids(member_id) {
2394 session.peer.send(connection_id, update.clone())?;
2395 }
2396 }
2397
2398 Ok(())
2399}
2400
2401async fn get_channel_members(
2402 request: proto::GetChannelMembers,
2403 response: Response<proto::GetChannelMembers>,
2404 session: Session,
2405) -> Result<()> {
2406 let db = session.db().await;
2407 let channel_id = ChannelId::from_proto(request.channel_id);
2408 let members = db
2409 .get_channel_member_details(channel_id, session.user_id)
2410 .await?;
2411 response.send(proto::GetChannelMembersResponse { members })?;
2412 Ok(())
2413}
2414
2415async fn respond_to_channel_invite(
2416 request: proto::RespondToChannelInvite,
2417 response: Response<proto::RespondToChannelInvite>,
2418 session: Session,
2419) -> Result<()> {
2420 let db = session.db().await;
2421 let channel_id = ChannelId::from_proto(request.channel_id);
2422 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2423 .await?;
2424
2425 let mut update = proto::UpdateChannels::default();
2426 update
2427 .remove_channel_invitations
2428 .push(channel_id.to_proto());
2429 if request.accept {
2430 let result = db.get_channels_for_user(session.user_id).await?;
2431 update
2432 .channels
2433 .extend(result.channels.into_iter().map(|channel| proto::Channel {
2434 id: channel.id.to_proto(),
2435 name: channel.name,
2436 parent_id: channel.parent_id.map(ChannelId::to_proto),
2437 }));
2438 update
2439 .channel_participants
2440 .extend(
2441 result
2442 .channel_participants
2443 .into_iter()
2444 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2445 channel_id: channel_id.to_proto(),
2446 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2447 }),
2448 );
2449 update
2450 .channel_permissions
2451 .extend(
2452 result
2453 .channels_with_admin_privileges
2454 .into_iter()
2455 .map(|channel_id| proto::ChannelPermission {
2456 channel_id: channel_id.to_proto(),
2457 is_admin: true,
2458 }),
2459 );
2460 }
2461 session.peer.send(session.connection_id, update)?;
2462 response.send(proto::Ack {})?;
2463
2464 Ok(())
2465}
2466
2467async fn join_channel(
2468 request: proto::JoinChannel,
2469 response: Response<proto::JoinChannel>,
2470 session: Session,
2471) -> Result<()> {
2472 let channel_id = ChannelId::from_proto(request.channel_id);
2473
2474 let joined_room = {
2475 leave_room_for_session(&session).await?;
2476 let db = session.db().await;
2477
2478 let room_id = db.room_id_for_channel(channel_id).await?;
2479
2480 let joined_room = db
2481 .join_room(room_id, session.user_id, session.connection_id)
2482 .await?;
2483
2484 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2485 let token = live_kit
2486 .room_token(
2487 &joined_room.room.live_kit_room,
2488 &session.user_id.to_string(),
2489 )
2490 .trace_err()?;
2491
2492 Some(LiveKitConnectionInfo {
2493 server_url: live_kit.url().into(),
2494 token,
2495 })
2496 });
2497
2498 response.send(proto::JoinRoomResponse {
2499 room: Some(joined_room.room.clone()),
2500 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2501 live_kit_connection_info,
2502 })?;
2503
2504 room_updated(&joined_room.room, &session.peer);
2505
2506 joined_room.into_inner()
2507 };
2508
2509 channel_updated(
2510 channel_id,
2511 &joined_room.room,
2512 &joined_room.channel_members,
2513 &session.peer,
2514 &*session.connection_pool().await,
2515 );
2516
2517 update_user_contacts(session.user_id, &session).await?;
2518
2519 Ok(())
2520}
2521
2522async fn join_channel_buffer(
2523 request: proto::JoinChannelBuffer,
2524 response: Response<proto::JoinChannelBuffer>,
2525 session: Session,
2526) -> Result<()> {
2527 let db = session.db().await;
2528 let channel_id = ChannelId::from_proto(request.channel_id);
2529
2530 let open_response = db
2531 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2532 .await?;
2533
2534 let replica_id = open_response.replica_id;
2535 let collaborators = open_response.collaborators.clone();
2536
2537 response.send(open_response)?;
2538
2539 let update = AddChannelBufferCollaborator {
2540 channel_id: channel_id.to_proto(),
2541 collaborator: Some(proto::Collaborator {
2542 user_id: session.user_id.to_proto(),
2543 peer_id: Some(session.connection_id.into()),
2544 replica_id,
2545 }),
2546 };
2547 channel_buffer_updated(
2548 session.connection_id,
2549 collaborators
2550 .iter()
2551 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2552 &update,
2553 &session.peer,
2554 );
2555
2556 Ok(())
2557}
2558
2559async fn update_channel_buffer(
2560 request: proto::UpdateChannelBuffer,
2561 session: Session,
2562) -> Result<()> {
2563 let db = session.db().await;
2564 let channel_id = ChannelId::from_proto(request.channel_id);
2565
2566 let collaborators = db
2567 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2568 .await?;
2569
2570 channel_buffer_updated(
2571 session.connection_id,
2572 collaborators,
2573 &proto::UpdateChannelBuffer {
2574 channel_id: channel_id.to_proto(),
2575 operations: request.operations,
2576 },
2577 &session.peer,
2578 );
2579 Ok(())
2580}
2581
2582async fn rejoin_channel_buffers(
2583 request: proto::RejoinChannelBuffers,
2584 response: Response<proto::RejoinChannelBuffers>,
2585 session: Session,
2586) -> Result<()> {
2587 let db = session.db().await;
2588 let buffers = db
2589 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2590 .await?;
2591
2592 for buffer in &buffers {
2593 let collaborators_to_notify = buffer
2594 .buffer
2595 .collaborators
2596 .iter()
2597 .filter_map(|c| Some(c.peer_id?.into()));
2598 channel_buffer_updated(
2599 session.connection_id,
2600 collaborators_to_notify,
2601 &proto::UpdateChannelBufferCollaborator {
2602 channel_id: buffer.buffer.channel_id,
2603 old_peer_id: Some(buffer.old_connection_id.into()),
2604 new_peer_id: Some(session.connection_id.into()),
2605 },
2606 &session.peer,
2607 );
2608 }
2609
2610 response.send(proto::RejoinChannelBuffersResponse {
2611 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2612 })?;
2613
2614 Ok(())
2615}
2616
2617async fn leave_channel_buffer(
2618 request: proto::LeaveChannelBuffer,
2619 response: Response<proto::LeaveChannelBuffer>,
2620 session: Session,
2621) -> Result<()> {
2622 let db = session.db().await;
2623 let channel_id = ChannelId::from_proto(request.channel_id);
2624
2625 let collaborators_to_notify = db
2626 .leave_channel_buffer(channel_id, session.connection_id)
2627 .await?;
2628
2629 response.send(Ack {})?;
2630
2631 channel_buffer_updated(
2632 session.connection_id,
2633 collaborators_to_notify,
2634 &proto::RemoveChannelBufferCollaborator {
2635 channel_id: channel_id.to_proto(),
2636 peer_id: Some(session.connection_id.into()),
2637 },
2638 &session.peer,
2639 );
2640
2641 Ok(())
2642}
2643
2644fn channel_buffer_updated<T: EnvelopedMessage>(
2645 sender_id: ConnectionId,
2646 collaborators: impl IntoIterator<Item = ConnectionId>,
2647 message: &T,
2648 peer: &Peer,
2649) {
2650 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2651 peer.send(peer_id.into(), message.clone())
2652 });
2653}
2654
2655async fn send_channel_message(
2656 request: proto::SendChannelMessage,
2657 response: Response<proto::SendChannelMessage>,
2658 session: Session,
2659) -> Result<()> {
2660 // Validate the message body.
2661 let body = request.body.trim().to_string();
2662 if body.len() > MAX_MESSAGE_LEN {
2663 return Err(anyhow!("message is too long"))?;
2664 }
2665 if body.is_empty() {
2666 return Err(anyhow!("message can't be blank"))?;
2667 }
2668
2669 let timestamp = OffsetDateTime::now_utc();
2670 let nonce = request
2671 .nonce
2672 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2673
2674 let channel_id = ChannelId::from_proto(request.channel_id);
2675 let (message_id, connection_ids) = session
2676 .db()
2677 .await
2678 .create_channel_message(
2679 channel_id,
2680 session.user_id,
2681 &body,
2682 timestamp,
2683 nonce.clone().into(),
2684 )
2685 .await?;
2686 let message = proto::ChannelMessage {
2687 sender_id: session.user_id.to_proto(),
2688 id: message_id.to_proto(),
2689 body,
2690 timestamp: timestamp.unix_timestamp() as u64,
2691 nonce: Some(nonce),
2692 };
2693 broadcast(Some(session.connection_id), connection_ids, |connection| {
2694 session.peer.send(
2695 connection,
2696 proto::ChannelMessageSent {
2697 channel_id: channel_id.to_proto(),
2698 message: Some(message.clone()),
2699 },
2700 )
2701 });
2702 response.send(proto::SendChannelMessageResponse {
2703 message: Some(message),
2704 })?;
2705 Ok(())
2706}
2707
2708async fn join_channel_chat(
2709 request: proto::JoinChannelChat,
2710 response: Response<proto::JoinChannelChat>,
2711 session: Session,
2712) -> Result<()> {
2713 let channel_id = ChannelId::from_proto(request.channel_id);
2714
2715 let db = session.db().await;
2716 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2717 .await?;
2718 let messages = db
2719 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2720 .await?;
2721 response.send(proto::JoinChannelChatResponse {
2722 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2723 messages,
2724 })?;
2725 Ok(())
2726}
2727
2728async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2729 let channel_id = ChannelId::from_proto(request.channel_id);
2730 session
2731 .db()
2732 .await
2733 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2734 .await?;
2735 Ok(())
2736}
2737
2738async fn get_channel_messages(
2739 request: proto::GetChannelMessages,
2740 response: Response<proto::GetChannelMessages>,
2741 session: Session,
2742) -> Result<()> {
2743 let channel_id = ChannelId::from_proto(request.channel_id);
2744 let messages = session
2745 .db()
2746 .await
2747 .get_channel_messages(
2748 channel_id,
2749 session.user_id,
2750 MESSAGE_COUNT_PER_PAGE,
2751 Some(MessageId::from_proto(request.before_message_id)),
2752 )
2753 .await?;
2754 response.send(proto::GetChannelMessagesResponse {
2755 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2756 messages,
2757 })?;
2758 Ok(())
2759}
2760
2761async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2762 let project_id = ProjectId::from_proto(request.project_id);
2763 let project_connection_ids = session
2764 .db()
2765 .await
2766 .project_connection_ids(project_id, session.connection_id)
2767 .await?;
2768 broadcast(
2769 Some(session.connection_id),
2770 project_connection_ids.iter().copied(),
2771 |connection_id| {
2772 session
2773 .peer
2774 .forward_send(session.connection_id, connection_id, request.clone())
2775 },
2776 );
2777 Ok(())
2778}
2779
2780async fn get_private_user_info(
2781 _request: proto::GetPrivateUserInfo,
2782 response: Response<proto::GetPrivateUserInfo>,
2783 session: Session,
2784) -> Result<()> {
2785 let db = session.db().await;
2786
2787 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2788 let user = db
2789 .get_user_by_id(session.user_id)
2790 .await?
2791 .ok_or_else(|| anyhow!("user not found"))?;
2792 let flags = db.get_user_flags(session.user_id).await?;
2793
2794 response.send(proto::GetPrivateUserInfoResponse {
2795 metrics_id,
2796 staff: user.admin,
2797 flags,
2798 })?;
2799 Ok(())
2800}
2801
2802fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2803 match message {
2804 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2805 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2806 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2807 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2808 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2809 code: frame.code.into(),
2810 reason: frame.reason,
2811 })),
2812 }
2813}
2814
2815fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2816 match message {
2817 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2818 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2819 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2820 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2821 AxumMessage::Close(frame) => {
2822 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2823 code: frame.code.into(),
2824 reason: frame.reason,
2825 }))
2826 }
2827 }
2828}
2829
2830fn build_initial_channels_update(
2831 channels: ChannelsForUser,
2832 channel_invites: Vec<db::Channel>,
2833) -> proto::UpdateChannels {
2834 let mut update = proto::UpdateChannels::default();
2835
2836 for channel in channels.channels {
2837 update.channels.push(proto::Channel {
2838 id: channel.id.to_proto(),
2839 name: channel.name,
2840 parent_id: channel.parent_id.map(|id| id.to_proto()),
2841 });
2842 }
2843
2844 for (channel_id, participants) in channels.channel_participants {
2845 update
2846 .channel_participants
2847 .push(proto::ChannelParticipants {
2848 channel_id: channel_id.to_proto(),
2849 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2850 });
2851 }
2852
2853 update
2854 .channel_permissions
2855 .extend(
2856 channels
2857 .channels_with_admin_privileges
2858 .into_iter()
2859 .map(|id| proto::ChannelPermission {
2860 channel_id: id.to_proto(),
2861 is_admin: true,
2862 }),
2863 );
2864
2865 for channel in channel_invites {
2866 update.channel_invitations.push(proto::Channel {
2867 id: channel.id.to_proto(),
2868 name: channel.name,
2869 parent_id: None,
2870 });
2871 }
2872
2873 update
2874}
2875
2876fn build_initial_contacts_update(
2877 contacts: Vec<db::Contact>,
2878 pool: &ConnectionPool,
2879) -> proto::UpdateContacts {
2880 let mut update = proto::UpdateContacts::default();
2881
2882 for contact in contacts {
2883 match contact {
2884 db::Contact::Accepted {
2885 user_id,
2886 should_notify,
2887 busy,
2888 } => {
2889 update
2890 .contacts
2891 .push(contact_for_user(user_id, should_notify, busy, &pool));
2892 }
2893 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2894 db::Contact::Incoming {
2895 user_id,
2896 should_notify,
2897 } => update
2898 .incoming_requests
2899 .push(proto::IncomingContactRequest {
2900 requester_id: user_id.to_proto(),
2901 should_notify,
2902 }),
2903 }
2904 }
2905
2906 update
2907}
2908
2909fn contact_for_user(
2910 user_id: UserId,
2911 should_notify: bool,
2912 busy: bool,
2913 pool: &ConnectionPool,
2914) -> proto::Contact {
2915 proto::Contact {
2916 user_id: user_id.to_proto(),
2917 online: pool.is_user_online(user_id),
2918 busy,
2919 should_notify,
2920 }
2921}
2922
2923fn room_updated(room: &proto::Room, peer: &Peer) {
2924 broadcast(
2925 None,
2926 room.participants
2927 .iter()
2928 .filter_map(|participant| Some(participant.peer_id?.into())),
2929 |peer_id| {
2930 peer.send(
2931 peer_id.into(),
2932 proto::RoomUpdated {
2933 room: Some(room.clone()),
2934 },
2935 )
2936 },
2937 );
2938}
2939
2940fn channel_updated(
2941 channel_id: ChannelId,
2942 room: &proto::Room,
2943 channel_members: &[UserId],
2944 peer: &Peer,
2945 pool: &ConnectionPool,
2946) {
2947 let participants = room
2948 .participants
2949 .iter()
2950 .map(|p| p.user_id)
2951 .collect::<Vec<_>>();
2952
2953 broadcast(
2954 None,
2955 channel_members
2956 .iter()
2957 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2958 |peer_id| {
2959 peer.send(
2960 peer_id.into(),
2961 proto::UpdateChannels {
2962 channel_participants: vec![proto::ChannelParticipants {
2963 channel_id: channel_id.to_proto(),
2964 participant_user_ids: participants.clone(),
2965 }],
2966 ..Default::default()
2967 },
2968 )
2969 },
2970 );
2971}
2972
2973async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2974 let db = session.db().await;
2975
2976 let contacts = db.get_contacts(user_id).await?;
2977 let busy = db.is_user_busy(user_id).await?;
2978
2979 let pool = session.connection_pool().await;
2980 let updated_contact = contact_for_user(user_id, false, busy, &pool);
2981 for contact in contacts {
2982 if let db::Contact::Accepted {
2983 user_id: contact_user_id,
2984 ..
2985 } = contact
2986 {
2987 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2988 session
2989 .peer
2990 .send(
2991 contact_conn_id,
2992 proto::UpdateContacts {
2993 contacts: vec![updated_contact.clone()],
2994 remove_contacts: Default::default(),
2995 incoming_requests: Default::default(),
2996 remove_incoming_requests: Default::default(),
2997 outgoing_requests: Default::default(),
2998 remove_outgoing_requests: Default::default(),
2999 },
3000 )
3001 .trace_err();
3002 }
3003 }
3004 }
3005 Ok(())
3006}
3007
3008async fn leave_room_for_session(session: &Session) -> Result<()> {
3009 let mut contacts_to_update = HashSet::default();
3010
3011 let room_id;
3012 let canceled_calls_to_user_ids;
3013 let live_kit_room;
3014 let delete_live_kit_room;
3015 let room;
3016 let channel_members;
3017 let channel_id;
3018
3019 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3020 contacts_to_update.insert(session.user_id);
3021
3022 for project in left_room.left_projects.values() {
3023 project_left(project, session);
3024 }
3025
3026 room_id = RoomId::from_proto(left_room.room.id);
3027 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3028 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3029 delete_live_kit_room = left_room.deleted;
3030 room = mem::take(&mut left_room.room);
3031 channel_members = mem::take(&mut left_room.channel_members);
3032 channel_id = left_room.channel_id;
3033
3034 room_updated(&room, &session.peer);
3035 } else {
3036 return Ok(());
3037 }
3038
3039 if let Some(channel_id) = channel_id {
3040 channel_updated(
3041 channel_id,
3042 &room,
3043 &channel_members,
3044 &session.peer,
3045 &*session.connection_pool().await,
3046 );
3047 }
3048
3049 {
3050 let pool = session.connection_pool().await;
3051 for canceled_user_id in canceled_calls_to_user_ids {
3052 for connection_id in pool.user_connection_ids(canceled_user_id) {
3053 session
3054 .peer
3055 .send(
3056 connection_id,
3057 proto::CallCanceled {
3058 room_id: room_id.to_proto(),
3059 },
3060 )
3061 .trace_err();
3062 }
3063 contacts_to_update.insert(canceled_user_id);
3064 }
3065 }
3066
3067 for contact_user_id in contacts_to_update {
3068 update_user_contacts(contact_user_id, &session).await?;
3069 }
3070
3071 if let Some(live_kit) = session.live_kit_client.as_ref() {
3072 live_kit
3073 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3074 .await
3075 .trace_err();
3076
3077 if delete_live_kit_room {
3078 live_kit.delete_room(live_kit_room).await.trace_err();
3079 }
3080 }
3081
3082 Ok(())
3083}
3084
3085async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3086 let left_channel_buffers = session
3087 .db()
3088 .await
3089 .leave_channel_buffers(session.connection_id)
3090 .await?;
3091
3092 for (channel_id, connections) in left_channel_buffers {
3093 channel_buffer_updated(
3094 session.connection_id,
3095 connections,
3096 &proto::RemoveChannelBufferCollaborator {
3097 channel_id: channel_id.to_proto(),
3098 peer_id: Some(session.connection_id.into()),
3099 },
3100 &session.peer,
3101 );
3102 }
3103
3104 Ok(())
3105}
3106
3107fn project_left(project: &db::LeftProject, session: &Session) {
3108 for connection_id in &project.connection_ids {
3109 if project.host_user_id == session.user_id {
3110 session
3111 .peer
3112 .send(
3113 *connection_id,
3114 proto::UnshareProject {
3115 project_id: project.id.to_proto(),
3116 },
3117 )
3118 .trace_err();
3119 } else {
3120 session
3121 .peer
3122 .send(
3123 *connection_id,
3124 proto::RemoveProjectCollaborator {
3125 project_id: project.id.to_proto(),
3126 peer_id: Some(session.connection_id.into()),
3127 },
3128 )
3129 .trace_err();
3130 }
3131 }
3132}
3133
3134pub trait ResultExt {
3135 type Ok;
3136
3137 fn trace_err(self) -> Option<Self::Ok>;
3138}
3139
3140impl<T, E> ResultExt for Result<T, E>
3141where
3142 E: std::fmt::Debug,
3143{
3144 type Ok = T;
3145
3146 fn trace_err(self) -> Option<T> {
3147 match self {
3148 Ok(value) => Some(value),
3149 Err(error) => {
3150 tracing::error!("{:?}", error);
3151 None
3152 }
3153 }
3154 }
3155}