1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
7 UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66
67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
69
70const MESSAGE_COUNT_PER_PAGE: usize = 100;
71const MAX_MESSAGE_LEN: usize = 1024;
72
73lazy_static! {
74 static ref METRIC_CONNECTIONS: IntGauge =
75 register_int_gauge!("connections", "number of connections").unwrap();
76 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
77 "shared_projects",
78 "number of open projects with one or more guests"
79 )
80 .unwrap();
81}
82
83type MessageHandler =
84 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
85
86struct Response<R> {
87 peer: Arc<Peer>,
88 receipt: Receipt<R>,
89 responded: Arc<AtomicBool>,
90}
91
92impl<R: RequestMessage> Response<R> {
93 fn send(self, payload: R::Response) -> Result<()> {
94 self.responded.store(true, SeqCst);
95 self.peer.respond(self.receipt, payload)?;
96 Ok(())
97 }
98}
99
100#[derive(Clone)]
101struct Session {
102 user_id: UserId,
103 connection_id: ConnectionId,
104 db: Arc<tokio::sync::Mutex<DbHandle>>,
105 peer: Arc<Peer>,
106 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
107 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
108 executor: Executor,
109}
110
111impl Session {
112 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 let guard = self.db.lock().await;
116 #[cfg(test)]
117 tokio::task::yield_now().await;
118 guard
119 }
120
121 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
122 #[cfg(test)]
123 tokio::task::yield_now().await;
124 let guard = self.connection_pool.lock();
125 ConnectionPoolGuard {
126 guard,
127 _not_send: PhantomData,
128 }
129 }
130}
131
132impl fmt::Debug for Session {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("Session")
135 .field("user_id", &self.user_id)
136 .field("connection_id", &self.connection_id)
137 .finish()
138 }
139}
140
141struct DbHandle(Arc<Database>);
142
143impl Deref for DbHandle {
144 type Target = Database;
145
146 fn deref(&self) -> &Self::Target {
147 self.0.as_ref()
148 }
149}
150
151pub struct Server {
152 id: parking_lot::Mutex<ServerId>,
153 peer: Arc<Peer>,
154 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
155 app_state: Arc<AppState>,
156 executor: Executor,
157 handlers: HashMap<TypeId, MessageHandler>,
158 teardown: watch::Sender<()>,
159}
160
161pub(crate) struct ConnectionPoolGuard<'a> {
162 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
163 _not_send: PhantomData<Rc<()>>,
164}
165
166#[derive(Serialize)]
167pub struct ServerSnapshot<'a> {
168 peer: &'a Peer,
169 #[serde(serialize_with = "serialize_deref")]
170 connection_pool: ConnectionPoolGuard<'a>,
171}
172
173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
174where
175 S: Serializer,
176 T: Deref<Target = U>,
177 U: Serialize,
178{
179 Serialize::serialize(value.deref(), serializer)
180}
181
182impl Server {
183 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
184 let mut server = Self {
185 id: parking_lot::Mutex::new(id),
186 peer: Peer::new(id.0 as u32),
187 app_state,
188 executor,
189 connection_pool: Default::default(),
190 handlers: Default::default(),
191 teardown: watch::channel(()).0,
192 };
193
194 server
195 .add_request_handler(ping)
196 .add_request_handler(create_room)
197 .add_request_handler(join_room)
198 .add_request_handler(rejoin_room)
199 .add_request_handler(leave_room)
200 .add_request_handler(call)
201 .add_request_handler(cancel_call)
202 .add_message_handler(decline_call)
203 .add_request_handler(update_participant_location)
204 .add_request_handler(share_project)
205 .add_message_handler(unshare_project)
206 .add_request_handler(join_project)
207 .add_message_handler(leave_project)
208 .add_request_handler(update_project)
209 .add_request_handler(update_worktree)
210 .add_message_handler(start_language_server)
211 .add_message_handler(update_language_server)
212 .add_message_handler(update_diagnostic_summary)
213 .add_message_handler(update_worktree_settings)
214 .add_message_handler(refresh_inlay_hints)
215 .add_request_handler(forward_project_request::<proto::GetHover>)
216 .add_request_handler(forward_project_request::<proto::GetDefinition>)
217 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetReferences>)
219 .add_request_handler(forward_project_request::<proto::SearchProject>)
220 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
221 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
222 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
225 .add_request_handler(forward_project_request::<proto::GetCompletions>)
226 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
227 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
229 .add_request_handler(forward_project_request::<proto::PrepareRename>)
230 .add_request_handler(forward_project_request::<proto::PerformRename>)
231 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
232 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
233 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
234 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
235 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
240 .add_request_handler(forward_project_request::<proto::InlayHints>)
241 .add_message_handler(create_buffer_for_peer)
242 .add_request_handler(update_buffer)
243 .add_message_handler(update_buffer_file)
244 .add_message_handler(buffer_reloaded)
245 .add_message_handler(buffer_saved)
246 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
247 .add_request_handler(get_users)
248 .add_request_handler(fuzzy_search_users)
249 .add_request_handler(request_contact)
250 .add_request_handler(remove_contact)
251 .add_request_handler(respond_to_contact_request)
252 .add_request_handler(create_channel)
253 .add_request_handler(remove_channel)
254 .add_request_handler(invite_channel_member)
255 .add_request_handler(remove_channel_member)
256 .add_request_handler(set_channel_member_admin)
257 .add_request_handler(rename_channel)
258 .add_request_handler(join_channel_buffer)
259 .add_request_handler(leave_channel_buffer)
260 .add_message_handler(update_channel_buffer)
261 .add_request_handler(rejoin_channel_buffers)
262 .add_request_handler(get_channel_members)
263 .add_request_handler(respond_to_channel_invite)
264 .add_request_handler(join_channel)
265 .add_request_handler(join_channel_chat)
266 .add_message_handler(leave_channel_chat)
267 .add_request_handler(send_channel_message)
268 .add_request_handler(remove_channel_message)
269 .add_request_handler(get_channel_messages)
270 .add_request_handler(follow)
271 .add_message_handler(unfollow)
272 .add_message_handler(update_followers)
273 .add_message_handler(update_diff_base)
274 .add_request_handler(get_private_user_info);
275
276 Arc::new(server)
277 }
278
279 pub async fn start(&self) -> Result<()> {
280 let server_id = *self.id.lock();
281 let app_state = self.app_state.clone();
282 let peer = self.peer.clone();
283 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
284 let pool = self.connection_pool.clone();
285 let live_kit_client = self.app_state.live_kit_client.clone();
286
287 let span = info_span!("start server");
288 self.executor.spawn_detached(
289 async move {
290 tracing::info!("waiting for cleanup timeout");
291 timeout.await;
292 tracing::info!("cleanup timeout expired, retrieving stale rooms");
293 if let Some((room_ids, channel_ids)) = app_state
294 .db
295 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
296 .await
297 .trace_err()
298 {
299 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
300 tracing::info!(
301 stale_channel_buffer_count = channel_ids.len(),
302 "retrieved stale channel buffers"
303 );
304
305 for channel_id in channel_ids {
306 if let Some(refreshed_channel_buffer) = app_state
307 .db
308 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
309 .await
310 .trace_err()
311 {
312 for connection_id in refreshed_channel_buffer.connection_ids {
313 for message in &refreshed_channel_buffer.removed_collaborators {
314 peer.send(connection_id, message.clone()).trace_err();
315 }
316 }
317 }
318 }
319
320 for room_id in room_ids {
321 let mut contacts_to_update = HashSet::default();
322 let mut canceled_calls_to_user_ids = Vec::new();
323 let mut live_kit_room = String::new();
324 let mut delete_live_kit_room = false;
325
326 if let Some(mut refreshed_room) = app_state
327 .db
328 .clear_stale_room_participants(room_id, server_id)
329 .await
330 .trace_err()
331 {
332 tracing::info!(
333 room_id = room_id.0,
334 new_participant_count = refreshed_room.room.participants.len(),
335 "refreshed room"
336 );
337 room_updated(&refreshed_room.room, &peer);
338 if let Some(channel_id) = refreshed_room.channel_id {
339 channel_updated(
340 channel_id,
341 &refreshed_room.room,
342 &refreshed_room.channel_members,
343 &peer,
344 &*pool.lock(),
345 );
346 }
347 contacts_to_update
348 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
349 contacts_to_update
350 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
351 canceled_calls_to_user_ids =
352 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
353 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
354 delete_live_kit_room = refreshed_room.room.participants.is_empty();
355 }
356
357 {
358 let pool = pool.lock();
359 for canceled_user_id in canceled_calls_to_user_ids {
360 for connection_id in pool.user_connection_ids(canceled_user_id) {
361 peer.send(
362 connection_id,
363 proto::CallCanceled {
364 room_id: room_id.to_proto(),
365 },
366 )
367 .trace_err();
368 }
369 }
370 }
371
372 for user_id in contacts_to_update {
373 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
374 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
375 if let Some((busy, contacts)) = busy.zip(contacts) {
376 let pool = pool.lock();
377 let updated_contact = contact_for_user(user_id, false, busy, &pool);
378 for contact in contacts {
379 if let db::Contact::Accepted {
380 user_id: contact_user_id,
381 ..
382 } = contact
383 {
384 for contact_conn_id in
385 pool.user_connection_ids(contact_user_id)
386 {
387 peer.send(
388 contact_conn_id,
389 proto::UpdateContacts {
390 contacts: vec![updated_contact.clone()],
391 remove_contacts: Default::default(),
392 incoming_requests: Default::default(),
393 remove_incoming_requests: Default::default(),
394 outgoing_requests: Default::default(),
395 remove_outgoing_requests: Default::default(),
396 },
397 )
398 .trace_err();
399 }
400 }
401 }
402 }
403 }
404
405 if let Some(live_kit) = live_kit_client.as_ref() {
406 if delete_live_kit_room {
407 live_kit.delete_room(live_kit_room).await.trace_err();
408 }
409 }
410 }
411 }
412
413 app_state
414 .db
415 .delete_stale_servers(&app_state.config.zed_environment, server_id)
416 .await
417 .trace_err();
418 }
419 .instrument(span),
420 );
421 Ok(())
422 }
423
424 pub fn teardown(&self) {
425 self.peer.teardown();
426 self.connection_pool.lock().reset();
427 let _ = self.teardown.send(());
428 }
429
430 #[cfg(test)]
431 pub fn reset(&self, id: ServerId) {
432 self.teardown();
433 *self.id.lock() = id;
434 self.peer.reset(id.0 as u32);
435 }
436
437 #[cfg(test)]
438 pub fn id(&self) -> ServerId {
439 *self.id.lock()
440 }
441
442 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
443 where
444 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
445 Fut: 'static + Send + Future<Output = Result<()>>,
446 M: EnvelopedMessage,
447 {
448 let prev_handler = self.handlers.insert(
449 TypeId::of::<M>(),
450 Box::new(move |envelope, session| {
451 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
452 let span = info_span!(
453 "handle message",
454 payload_type = envelope.payload_type_name()
455 );
456 span.in_scope(|| {
457 tracing::info!(
458 payload_type = envelope.payload_type_name(),
459 "message received"
460 );
461 });
462 let start_time = Instant::now();
463 let future = (handler)(*envelope, session);
464 async move {
465 let result = future.await;
466 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
467 match result {
468 Err(error) => {
469 tracing::error!(%error, ?duration_ms, "error handling message")
470 }
471 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
472 }
473 }
474 .instrument(span)
475 .boxed()
476 }),
477 );
478 if prev_handler.is_some() {
479 panic!("registered a handler for the same message twice");
480 }
481 self
482 }
483
484 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
485 where
486 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
487 Fut: 'static + Send + Future<Output = Result<()>>,
488 M: EnvelopedMessage,
489 {
490 self.add_handler(move |envelope, session| handler(envelope.payload, session));
491 self
492 }
493
494 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
495 where
496 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
497 Fut: Send + Future<Output = Result<()>>,
498 M: RequestMessage,
499 {
500 let handler = Arc::new(handler);
501 self.add_handler(move |envelope, session| {
502 let receipt = envelope.receipt();
503 let handler = handler.clone();
504 async move {
505 let peer = session.peer.clone();
506 let responded = Arc::new(AtomicBool::default());
507 let response = Response {
508 peer: peer.clone(),
509 responded: responded.clone(),
510 receipt,
511 };
512 match (handler)(envelope.payload, response, session).await {
513 Ok(()) => {
514 if responded.load(std::sync::atomic::Ordering::SeqCst) {
515 Ok(())
516 } else {
517 Err(anyhow!("handler did not send a response"))?
518 }
519 }
520 Err(error) => {
521 peer.respond_with_error(
522 receipt,
523 proto::Error {
524 message: error.to_string(),
525 },
526 )?;
527 Err(error)
528 }
529 }
530 }
531 })
532 }
533
534 pub fn handle_connection(
535 self: &Arc<Self>,
536 connection: Connection,
537 address: String,
538 user: User,
539 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
540 executor: Executor,
541 ) -> impl Future<Output = Result<()>> {
542 let this = self.clone();
543 let user_id = user.id;
544 let login = user.github_login;
545 let span = info_span!("handle connection", %user_id, %login, %address);
546 let mut teardown = self.teardown.subscribe();
547 async move {
548 let (connection_id, handle_io, mut incoming_rx) = this
549 .peer
550 .add_connection(connection, {
551 let executor = executor.clone();
552 move |duration| executor.sleep(duration)
553 });
554
555 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
556 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
557 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
558
559 if let Some(send_connection_id) = send_connection_id.take() {
560 let _ = send_connection_id.send(connection_id);
561 }
562
563 if !user.connected_once {
564 this.peer.send(connection_id, proto::ShowContacts {})?;
565 this.app_state.db.set_user_connected_once(user_id, true).await?;
566 }
567
568 let (contacts, channels_for_user, channel_invites) = future::try_join3(
569 this.app_state.db.get_contacts(user_id),
570 this.app_state.db.get_channels_for_user(user_id),
571 this.app_state.db.get_channel_invites_for_user(user_id)
572 ).await?;
573
574 {
575 let mut pool = this.connection_pool.lock();
576 pool.add_connection(connection_id, user_id, user.admin);
577 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
578 this.peer.send(connection_id, build_initial_channels_update(
579 channels_for_user,
580 channel_invites
581 ))?;
582 }
583
584 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
585 this.peer.send(connection_id, incoming_call)?;
586 }
587
588 let session = Session {
589 user_id,
590 connection_id,
591 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
592 peer: this.peer.clone(),
593 connection_pool: this.connection_pool.clone(),
594 live_kit_client: this.app_state.live_kit_client.clone(),
595 executor: executor.clone(),
596 };
597 update_user_contacts(user_id, &session).await?;
598
599 let handle_io = handle_io.fuse();
600 futures::pin_mut!(handle_io);
601
602 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
603 // This prevents deadlocks when e.g., client A performs a request to client B and
604 // client B performs a request to client A. If both clients stop processing further
605 // messages until their respective request completes, they won't have a chance to
606 // respond to the other client's request and cause a deadlock.
607 //
608 // This arrangement ensures we will attempt to process earlier messages first, but fall
609 // back to processing messages arrived later in the spirit of making progress.
610 let mut foreground_message_handlers = FuturesUnordered::new();
611 let concurrent_handlers = Arc::new(Semaphore::new(256));
612 loop {
613 let next_message = async {
614 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
615 let message = incoming_rx.next().await;
616 (permit, message)
617 }.fuse();
618 futures::pin_mut!(next_message);
619 futures::select_biased! {
620 _ = teardown.changed().fuse() => return Ok(()),
621 result = handle_io => {
622 if let Err(error) = result {
623 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
624 }
625 break;
626 }
627 _ = foreground_message_handlers.next() => {}
628 next_message = next_message => {
629 let (permit, message) = next_message;
630 if let Some(message) = message {
631 let type_name = message.payload_type_name();
632 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
633 let span_enter = span.enter();
634 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
635 let is_background = message.is_background();
636 let handle_message = (handler)(message, session.clone());
637 drop(span_enter);
638
639 let handle_message = async move {
640 handle_message.await;
641 drop(permit);
642 }.instrument(span);
643 if is_background {
644 executor.spawn_detached(handle_message);
645 } else {
646 foreground_message_handlers.push(handle_message);
647 }
648 } else {
649 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
650 }
651 } else {
652 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
653 break;
654 }
655 }
656 }
657 }
658
659 drop(foreground_message_handlers);
660 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
661 if let Err(error) = connection_lost(session, teardown, executor).await {
662 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
663 }
664
665 Ok(())
666 }.instrument(span)
667 }
668
669 pub async fn invite_code_redeemed(
670 self: &Arc<Self>,
671 inviter_id: UserId,
672 invitee_id: UserId,
673 ) -> Result<()> {
674 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
675 if let Some(code) = &user.invite_code {
676 let pool = self.connection_pool.lock();
677 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
678 for connection_id in pool.user_connection_ids(inviter_id) {
679 self.peer.send(
680 connection_id,
681 proto::UpdateContacts {
682 contacts: vec![invitee_contact.clone()],
683 ..Default::default()
684 },
685 )?;
686 self.peer.send(
687 connection_id,
688 proto::UpdateInviteInfo {
689 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
690 count: user.invite_count as u32,
691 },
692 )?;
693 }
694 }
695 }
696 Ok(())
697 }
698
699 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
700 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
701 if let Some(invite_code) = &user.invite_code {
702 let pool = self.connection_pool.lock();
703 for connection_id in pool.user_connection_ids(user_id) {
704 self.peer.send(
705 connection_id,
706 proto::UpdateInviteInfo {
707 url: format!(
708 "{}{}",
709 self.app_state.config.invite_link_prefix, invite_code
710 ),
711 count: user.invite_count as u32,
712 },
713 )?;
714 }
715 }
716 }
717 Ok(())
718 }
719
720 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
721 ServerSnapshot {
722 connection_pool: ConnectionPoolGuard {
723 guard: self.connection_pool.lock(),
724 _not_send: PhantomData,
725 },
726 peer: &self.peer,
727 }
728 }
729}
730
731impl<'a> Deref for ConnectionPoolGuard<'a> {
732 type Target = ConnectionPool;
733
734 fn deref(&self) -> &Self::Target {
735 &*self.guard
736 }
737}
738
739impl<'a> DerefMut for ConnectionPoolGuard<'a> {
740 fn deref_mut(&mut self) -> &mut Self::Target {
741 &mut *self.guard
742 }
743}
744
745impl<'a> Drop for ConnectionPoolGuard<'a> {
746 fn drop(&mut self) {
747 #[cfg(test)]
748 self.check_invariants();
749 }
750}
751
752fn broadcast<F>(
753 sender_id: Option<ConnectionId>,
754 receiver_ids: impl IntoIterator<Item = ConnectionId>,
755 mut f: F,
756) where
757 F: FnMut(ConnectionId) -> anyhow::Result<()>,
758{
759 for receiver_id in receiver_ids {
760 if Some(receiver_id) != sender_id {
761 if let Err(error) = f(receiver_id) {
762 tracing::error!("failed to send to {:?} {}", receiver_id, error);
763 }
764 }
765 }
766}
767
768lazy_static! {
769 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
770}
771
772pub struct ProtocolVersion(u32);
773
774impl Header for ProtocolVersion {
775 fn name() -> &'static HeaderName {
776 &ZED_PROTOCOL_VERSION
777 }
778
779 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
780 where
781 Self: Sized,
782 I: Iterator<Item = &'i axum::http::HeaderValue>,
783 {
784 let version = values
785 .next()
786 .ok_or_else(axum::headers::Error::invalid)?
787 .to_str()
788 .map_err(|_| axum::headers::Error::invalid())?
789 .parse()
790 .map_err(|_| axum::headers::Error::invalid())?;
791 Ok(Self(version))
792 }
793
794 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
795 values.extend([self.0.to_string().parse().unwrap()]);
796 }
797}
798
799pub fn routes(server: Arc<Server>) -> Router<Body> {
800 Router::new()
801 .route("/rpc", get(handle_websocket_request))
802 .layer(
803 ServiceBuilder::new()
804 .layer(Extension(server.app_state.clone()))
805 .layer(middleware::from_fn(auth::validate_header)),
806 )
807 .route("/metrics", get(handle_metrics))
808 .layer(Extension(server))
809}
810
811pub async fn handle_websocket_request(
812 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
813 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
814 Extension(server): Extension<Arc<Server>>,
815 Extension(user): Extension<User>,
816 ws: WebSocketUpgrade,
817) -> axum::response::Response {
818 if protocol_version != rpc::PROTOCOL_VERSION {
819 return (
820 StatusCode::UPGRADE_REQUIRED,
821 "client must be upgraded".to_string(),
822 )
823 .into_response();
824 }
825 let socket_address = socket_address.to_string();
826 ws.on_upgrade(move |socket| {
827 use util::ResultExt;
828 let socket = socket
829 .map_ok(to_tungstenite_message)
830 .err_into()
831 .with(|message| async move { Ok(to_axum_message(message)) });
832 let connection = Connection::new(Box::pin(socket));
833 async move {
834 server
835 .handle_connection(connection, socket_address, user, None, Executor::Production)
836 .await
837 .log_err();
838 }
839 })
840}
841
842pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
843 let connections = server
844 .connection_pool
845 .lock()
846 .connections()
847 .filter(|connection| !connection.admin)
848 .count();
849
850 METRIC_CONNECTIONS.set(connections as _);
851
852 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
853 METRIC_SHARED_PROJECTS.set(shared_projects as _);
854
855 let encoder = prometheus::TextEncoder::new();
856 let metric_families = prometheus::gather();
857 let encoded_metrics = encoder
858 .encode_to_string(&metric_families)
859 .map_err(|err| anyhow!("{}", err))?;
860 Ok(encoded_metrics)
861}
862
863#[instrument(err, skip(executor))]
864async fn connection_lost(
865 session: Session,
866 mut teardown: watch::Receiver<()>,
867 executor: Executor,
868) -> Result<()> {
869 session.peer.disconnect(session.connection_id);
870 session
871 .connection_pool()
872 .await
873 .remove_connection(session.connection_id)?;
874
875 session
876 .db()
877 .await
878 .connection_lost(session.connection_id)
879 .await
880 .trace_err();
881
882 futures::select_biased! {
883 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
884 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
885 leave_room_for_session(&session).await.trace_err();
886 leave_channel_buffers_for_session(&session)
887 .await
888 .trace_err();
889
890 if !session
891 .connection_pool()
892 .await
893 .is_user_online(session.user_id)
894 {
895 let db = session.db().await;
896 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
897 room_updated(&room, &session.peer);
898 }
899 }
900
901 update_user_contacts(session.user_id, &session).await?;
902 }
903 _ = teardown.changed().fuse() => {}
904 }
905
906 Ok(())
907}
908
909async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
910 response.send(proto::Ack {})?;
911 Ok(())
912}
913
914async fn create_room(
915 _request: proto::CreateRoom,
916 response: Response<proto::CreateRoom>,
917 session: Session,
918) -> Result<()> {
919 let live_kit_room = nanoid::nanoid!(30);
920
921 let live_kit_connection_info = {
922 let live_kit_room = live_kit_room.clone();
923 let live_kit = session.live_kit_client.as_ref();
924
925 util::async_iife!({
926 let live_kit = live_kit?;
927
928 live_kit
929 .create_room(live_kit_room.clone())
930 .await
931 .trace_err()?;
932
933 let token = live_kit
934 .room_token(&live_kit_room, &session.user_id.to_string())
935 .trace_err()?;
936
937 Some(proto::LiveKitConnectionInfo {
938 server_url: live_kit.url().into(),
939 token,
940 })
941 })
942 }
943 .await;
944
945 let room = session
946 .db()
947 .await
948 .create_room(session.user_id, session.connection_id, &live_kit_room)
949 .await?;
950
951 response.send(proto::CreateRoomResponse {
952 room: Some(room.clone()),
953 live_kit_connection_info,
954 })?;
955
956 update_user_contacts(session.user_id, &session).await?;
957 Ok(())
958}
959
960async fn join_room(
961 request: proto::JoinRoom,
962 response: Response<proto::JoinRoom>,
963 session: Session,
964) -> Result<()> {
965 let room_id = RoomId::from_proto(request.id);
966 let joined_room = {
967 let room = session
968 .db()
969 .await
970 .join_room(room_id, session.user_id, session.connection_id)
971 .await?;
972 room_updated(&room.room, &session.peer);
973 room.into_inner()
974 };
975
976 if let Some(channel_id) = joined_room.channel_id {
977 channel_updated(
978 channel_id,
979 &joined_room.room,
980 &joined_room.channel_members,
981 &session.peer,
982 &*session.connection_pool().await,
983 )
984 }
985
986 for connection_id in session
987 .connection_pool()
988 .await
989 .user_connection_ids(session.user_id)
990 {
991 session
992 .peer
993 .send(
994 connection_id,
995 proto::CallCanceled {
996 room_id: room_id.to_proto(),
997 },
998 )
999 .trace_err();
1000 }
1001
1002 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1003 if let Some(token) = live_kit
1004 .room_token(
1005 &joined_room.room.live_kit_room,
1006 &session.user_id.to_string(),
1007 )
1008 .trace_err()
1009 {
1010 Some(proto::LiveKitConnectionInfo {
1011 server_url: live_kit.url().into(),
1012 token,
1013 })
1014 } else {
1015 None
1016 }
1017 } else {
1018 None
1019 };
1020
1021 response.send(proto::JoinRoomResponse {
1022 room: Some(joined_room.room),
1023 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1024 live_kit_connection_info,
1025 })?;
1026
1027 update_user_contacts(session.user_id, &session).await?;
1028 Ok(())
1029}
1030
1031async fn rejoin_room(
1032 request: proto::RejoinRoom,
1033 response: Response<proto::RejoinRoom>,
1034 session: Session,
1035) -> Result<()> {
1036 let room;
1037 let channel_id;
1038 let channel_members;
1039 {
1040 let mut rejoined_room = session
1041 .db()
1042 .await
1043 .rejoin_room(request, session.user_id, session.connection_id)
1044 .await?;
1045
1046 response.send(proto::RejoinRoomResponse {
1047 room: Some(rejoined_room.room.clone()),
1048 reshared_projects: rejoined_room
1049 .reshared_projects
1050 .iter()
1051 .map(|project| proto::ResharedProject {
1052 id: project.id.to_proto(),
1053 collaborators: project
1054 .collaborators
1055 .iter()
1056 .map(|collaborator| collaborator.to_proto())
1057 .collect(),
1058 })
1059 .collect(),
1060 rejoined_projects: rejoined_room
1061 .rejoined_projects
1062 .iter()
1063 .map(|rejoined_project| proto::RejoinedProject {
1064 id: rejoined_project.id.to_proto(),
1065 worktrees: rejoined_project
1066 .worktrees
1067 .iter()
1068 .map(|worktree| proto::WorktreeMetadata {
1069 id: worktree.id,
1070 root_name: worktree.root_name.clone(),
1071 visible: worktree.visible,
1072 abs_path: worktree.abs_path.clone(),
1073 })
1074 .collect(),
1075 collaborators: rejoined_project
1076 .collaborators
1077 .iter()
1078 .map(|collaborator| collaborator.to_proto())
1079 .collect(),
1080 language_servers: rejoined_project.language_servers.clone(),
1081 })
1082 .collect(),
1083 })?;
1084 room_updated(&rejoined_room.room, &session.peer);
1085
1086 for project in &rejoined_room.reshared_projects {
1087 for collaborator in &project.collaborators {
1088 session
1089 .peer
1090 .send(
1091 collaborator.connection_id,
1092 proto::UpdateProjectCollaborator {
1093 project_id: project.id.to_proto(),
1094 old_peer_id: Some(project.old_connection_id.into()),
1095 new_peer_id: Some(session.connection_id.into()),
1096 },
1097 )
1098 .trace_err();
1099 }
1100
1101 broadcast(
1102 Some(session.connection_id),
1103 project
1104 .collaborators
1105 .iter()
1106 .map(|collaborator| collaborator.connection_id),
1107 |connection_id| {
1108 session.peer.forward_send(
1109 session.connection_id,
1110 connection_id,
1111 proto::UpdateProject {
1112 project_id: project.id.to_proto(),
1113 worktrees: project.worktrees.clone(),
1114 },
1115 )
1116 },
1117 );
1118 }
1119
1120 for project in &rejoined_room.rejoined_projects {
1121 for collaborator in &project.collaborators {
1122 session
1123 .peer
1124 .send(
1125 collaborator.connection_id,
1126 proto::UpdateProjectCollaborator {
1127 project_id: project.id.to_proto(),
1128 old_peer_id: Some(project.old_connection_id.into()),
1129 new_peer_id: Some(session.connection_id.into()),
1130 },
1131 )
1132 .trace_err();
1133 }
1134 }
1135
1136 for project in &mut rejoined_room.rejoined_projects {
1137 for worktree in mem::take(&mut project.worktrees) {
1138 #[cfg(any(test, feature = "test-support"))]
1139 const MAX_CHUNK_SIZE: usize = 2;
1140 #[cfg(not(any(test, feature = "test-support")))]
1141 const MAX_CHUNK_SIZE: usize = 256;
1142
1143 // Stream this worktree's entries.
1144 let message = proto::UpdateWorktree {
1145 project_id: project.id.to_proto(),
1146 worktree_id: worktree.id,
1147 abs_path: worktree.abs_path.clone(),
1148 root_name: worktree.root_name,
1149 updated_entries: worktree.updated_entries,
1150 removed_entries: worktree.removed_entries,
1151 scan_id: worktree.scan_id,
1152 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1153 updated_repositories: worktree.updated_repositories,
1154 removed_repositories: worktree.removed_repositories,
1155 };
1156 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1157 session.peer.send(session.connection_id, update.clone())?;
1158 }
1159
1160 // Stream this worktree's diagnostics.
1161 for summary in worktree.diagnostic_summaries {
1162 session.peer.send(
1163 session.connection_id,
1164 proto::UpdateDiagnosticSummary {
1165 project_id: project.id.to_proto(),
1166 worktree_id: worktree.id,
1167 summary: Some(summary),
1168 },
1169 )?;
1170 }
1171
1172 for settings_file in worktree.settings_files {
1173 session.peer.send(
1174 session.connection_id,
1175 proto::UpdateWorktreeSettings {
1176 project_id: project.id.to_proto(),
1177 worktree_id: worktree.id,
1178 path: settings_file.path,
1179 content: Some(settings_file.content),
1180 },
1181 )?;
1182 }
1183 }
1184
1185 for language_server in &project.language_servers {
1186 session.peer.send(
1187 session.connection_id,
1188 proto::UpdateLanguageServer {
1189 project_id: project.id.to_proto(),
1190 language_server_id: language_server.id,
1191 variant: Some(
1192 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1193 proto::LspDiskBasedDiagnosticsUpdated {},
1194 ),
1195 ),
1196 },
1197 )?;
1198 }
1199 }
1200
1201 let rejoined_room = rejoined_room.into_inner();
1202
1203 room = rejoined_room.room;
1204 channel_id = rejoined_room.channel_id;
1205 channel_members = rejoined_room.channel_members;
1206 }
1207
1208 if let Some(channel_id) = channel_id {
1209 channel_updated(
1210 channel_id,
1211 &room,
1212 &channel_members,
1213 &session.peer,
1214 &*session.connection_pool().await,
1215 );
1216 }
1217
1218 update_user_contacts(session.user_id, &session).await?;
1219 Ok(())
1220}
1221
1222async fn leave_room(
1223 _: proto::LeaveRoom,
1224 response: Response<proto::LeaveRoom>,
1225 session: Session,
1226) -> Result<()> {
1227 leave_room_for_session(&session).await?;
1228 response.send(proto::Ack {})?;
1229 Ok(())
1230}
1231
1232async fn call(
1233 request: proto::Call,
1234 response: Response<proto::Call>,
1235 session: Session,
1236) -> Result<()> {
1237 let room_id = RoomId::from_proto(request.room_id);
1238 let calling_user_id = session.user_id;
1239 let calling_connection_id = session.connection_id;
1240 let called_user_id = UserId::from_proto(request.called_user_id);
1241 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1242 if !session
1243 .db()
1244 .await
1245 .has_contact(calling_user_id, called_user_id)
1246 .await?
1247 {
1248 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1249 }
1250
1251 let incoming_call = {
1252 let (room, incoming_call) = &mut *session
1253 .db()
1254 .await
1255 .call(
1256 room_id,
1257 calling_user_id,
1258 calling_connection_id,
1259 called_user_id,
1260 initial_project_id,
1261 )
1262 .await?;
1263 room_updated(&room, &session.peer);
1264 mem::take(incoming_call)
1265 };
1266 update_user_contacts(called_user_id, &session).await?;
1267
1268 let mut calls = session
1269 .connection_pool()
1270 .await
1271 .user_connection_ids(called_user_id)
1272 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1273 .collect::<FuturesUnordered<_>>();
1274
1275 while let Some(call_response) = calls.next().await {
1276 match call_response.as_ref() {
1277 Ok(_) => {
1278 response.send(proto::Ack {})?;
1279 return Ok(());
1280 }
1281 Err(_) => {
1282 call_response.trace_err();
1283 }
1284 }
1285 }
1286
1287 {
1288 let room = session
1289 .db()
1290 .await
1291 .call_failed(room_id, called_user_id)
1292 .await?;
1293 room_updated(&room, &session.peer);
1294 }
1295 update_user_contacts(called_user_id, &session).await?;
1296
1297 Err(anyhow!("failed to ring user"))?
1298}
1299
1300async fn cancel_call(
1301 request: proto::CancelCall,
1302 response: Response<proto::CancelCall>,
1303 session: Session,
1304) -> Result<()> {
1305 let called_user_id = UserId::from_proto(request.called_user_id);
1306 let room_id = RoomId::from_proto(request.room_id);
1307 {
1308 let room = session
1309 .db()
1310 .await
1311 .cancel_call(room_id, session.connection_id, called_user_id)
1312 .await?;
1313 room_updated(&room, &session.peer);
1314 }
1315
1316 for connection_id in session
1317 .connection_pool()
1318 .await
1319 .user_connection_ids(called_user_id)
1320 {
1321 session
1322 .peer
1323 .send(
1324 connection_id,
1325 proto::CallCanceled {
1326 room_id: room_id.to_proto(),
1327 },
1328 )
1329 .trace_err();
1330 }
1331 response.send(proto::Ack {})?;
1332
1333 update_user_contacts(called_user_id, &session).await?;
1334 Ok(())
1335}
1336
1337async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1338 let room_id = RoomId::from_proto(message.room_id);
1339 {
1340 let room = session
1341 .db()
1342 .await
1343 .decline_call(Some(room_id), session.user_id)
1344 .await?
1345 .ok_or_else(|| anyhow!("failed to decline call"))?;
1346 room_updated(&room, &session.peer);
1347 }
1348
1349 for connection_id in session
1350 .connection_pool()
1351 .await
1352 .user_connection_ids(session.user_id)
1353 {
1354 session
1355 .peer
1356 .send(
1357 connection_id,
1358 proto::CallCanceled {
1359 room_id: room_id.to_proto(),
1360 },
1361 )
1362 .trace_err();
1363 }
1364 update_user_contacts(session.user_id, &session).await?;
1365 Ok(())
1366}
1367
1368async fn update_participant_location(
1369 request: proto::UpdateParticipantLocation,
1370 response: Response<proto::UpdateParticipantLocation>,
1371 session: Session,
1372) -> Result<()> {
1373 let room_id = RoomId::from_proto(request.room_id);
1374 let location = request
1375 .location
1376 .ok_or_else(|| anyhow!("invalid location"))?;
1377
1378 let db = session.db().await;
1379 let room = db
1380 .update_room_participant_location(room_id, session.connection_id, location)
1381 .await?;
1382
1383 room_updated(&room, &session.peer);
1384 response.send(proto::Ack {})?;
1385 Ok(())
1386}
1387
1388async fn share_project(
1389 request: proto::ShareProject,
1390 response: Response<proto::ShareProject>,
1391 session: Session,
1392) -> Result<()> {
1393 let (project_id, room) = &*session
1394 .db()
1395 .await
1396 .share_project(
1397 RoomId::from_proto(request.room_id),
1398 session.connection_id,
1399 &request.worktrees,
1400 )
1401 .await?;
1402 response.send(proto::ShareProjectResponse {
1403 project_id: project_id.to_proto(),
1404 })?;
1405 room_updated(&room, &session.peer);
1406
1407 Ok(())
1408}
1409
1410async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1411 let project_id = ProjectId::from_proto(message.project_id);
1412
1413 let (room, guest_connection_ids) = &*session
1414 .db()
1415 .await
1416 .unshare_project(project_id, session.connection_id)
1417 .await?;
1418
1419 broadcast(
1420 Some(session.connection_id),
1421 guest_connection_ids.iter().copied(),
1422 |conn_id| session.peer.send(conn_id, message.clone()),
1423 );
1424 room_updated(&room, &session.peer);
1425
1426 Ok(())
1427}
1428
1429async fn join_project(
1430 request: proto::JoinProject,
1431 response: Response<proto::JoinProject>,
1432 session: Session,
1433) -> Result<()> {
1434 let project_id = ProjectId::from_proto(request.project_id);
1435 let guest_user_id = session.user_id;
1436
1437 tracing::info!(%project_id, "join project");
1438
1439 let (project, replica_id) = &mut *session
1440 .db()
1441 .await
1442 .join_project(project_id, session.connection_id)
1443 .await?;
1444
1445 let collaborators = project
1446 .collaborators
1447 .iter()
1448 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1449 .map(|collaborator| collaborator.to_proto())
1450 .collect::<Vec<_>>();
1451
1452 let worktrees = project
1453 .worktrees
1454 .iter()
1455 .map(|(id, worktree)| proto::WorktreeMetadata {
1456 id: *id,
1457 root_name: worktree.root_name.clone(),
1458 visible: worktree.visible,
1459 abs_path: worktree.abs_path.clone(),
1460 })
1461 .collect::<Vec<_>>();
1462
1463 for collaborator in &collaborators {
1464 session
1465 .peer
1466 .send(
1467 collaborator.peer_id.unwrap().into(),
1468 proto::AddProjectCollaborator {
1469 project_id: project_id.to_proto(),
1470 collaborator: Some(proto::Collaborator {
1471 peer_id: Some(session.connection_id.into()),
1472 replica_id: replica_id.0 as u32,
1473 user_id: guest_user_id.to_proto(),
1474 }),
1475 },
1476 )
1477 .trace_err();
1478 }
1479
1480 // First, we send the metadata associated with each worktree.
1481 response.send(proto::JoinProjectResponse {
1482 worktrees: worktrees.clone(),
1483 replica_id: replica_id.0 as u32,
1484 collaborators: collaborators.clone(),
1485 language_servers: project.language_servers.clone(),
1486 })?;
1487
1488 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1489 #[cfg(any(test, feature = "test-support"))]
1490 const MAX_CHUNK_SIZE: usize = 2;
1491 #[cfg(not(any(test, feature = "test-support")))]
1492 const MAX_CHUNK_SIZE: usize = 256;
1493
1494 // Stream this worktree's entries.
1495 let message = proto::UpdateWorktree {
1496 project_id: project_id.to_proto(),
1497 worktree_id,
1498 abs_path: worktree.abs_path.clone(),
1499 root_name: worktree.root_name,
1500 updated_entries: worktree.entries,
1501 removed_entries: Default::default(),
1502 scan_id: worktree.scan_id,
1503 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1504 updated_repositories: worktree.repository_entries.into_values().collect(),
1505 removed_repositories: Default::default(),
1506 };
1507 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1508 session.peer.send(session.connection_id, update.clone())?;
1509 }
1510
1511 // Stream this worktree's diagnostics.
1512 for summary in worktree.diagnostic_summaries {
1513 session.peer.send(
1514 session.connection_id,
1515 proto::UpdateDiagnosticSummary {
1516 project_id: project_id.to_proto(),
1517 worktree_id: worktree.id,
1518 summary: Some(summary),
1519 },
1520 )?;
1521 }
1522
1523 for settings_file in worktree.settings_files {
1524 session.peer.send(
1525 session.connection_id,
1526 proto::UpdateWorktreeSettings {
1527 project_id: project_id.to_proto(),
1528 worktree_id: worktree.id,
1529 path: settings_file.path,
1530 content: Some(settings_file.content),
1531 },
1532 )?;
1533 }
1534 }
1535
1536 for language_server in &project.language_servers {
1537 session.peer.send(
1538 session.connection_id,
1539 proto::UpdateLanguageServer {
1540 project_id: project_id.to_proto(),
1541 language_server_id: language_server.id,
1542 variant: Some(
1543 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1544 proto::LspDiskBasedDiagnosticsUpdated {},
1545 ),
1546 ),
1547 },
1548 )?;
1549 }
1550
1551 Ok(())
1552}
1553
1554async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1555 let sender_id = session.connection_id;
1556 let project_id = ProjectId::from_proto(request.project_id);
1557
1558 let (room, project) = &*session
1559 .db()
1560 .await
1561 .leave_project(project_id, sender_id)
1562 .await?;
1563 tracing::info!(
1564 %project_id,
1565 host_user_id = %project.host_user_id,
1566 host_connection_id = %project.host_connection_id,
1567 "leave project"
1568 );
1569
1570 project_left(&project, &session);
1571 room_updated(&room, &session.peer);
1572
1573 Ok(())
1574}
1575
1576async fn update_project(
1577 request: proto::UpdateProject,
1578 response: Response<proto::UpdateProject>,
1579 session: Session,
1580) -> Result<()> {
1581 let project_id = ProjectId::from_proto(request.project_id);
1582 let (room, guest_connection_ids) = &*session
1583 .db()
1584 .await
1585 .update_project(project_id, session.connection_id, &request.worktrees)
1586 .await?;
1587 broadcast(
1588 Some(session.connection_id),
1589 guest_connection_ids.iter().copied(),
1590 |connection_id| {
1591 session
1592 .peer
1593 .forward_send(session.connection_id, connection_id, request.clone())
1594 },
1595 );
1596 room_updated(&room, &session.peer);
1597 response.send(proto::Ack {})?;
1598
1599 Ok(())
1600}
1601
1602async fn update_worktree(
1603 request: proto::UpdateWorktree,
1604 response: Response<proto::UpdateWorktree>,
1605 session: Session,
1606) -> Result<()> {
1607 let guest_connection_ids = session
1608 .db()
1609 .await
1610 .update_worktree(&request, session.connection_id)
1611 .await?;
1612
1613 broadcast(
1614 Some(session.connection_id),
1615 guest_connection_ids.iter().copied(),
1616 |connection_id| {
1617 session
1618 .peer
1619 .forward_send(session.connection_id, connection_id, request.clone())
1620 },
1621 );
1622 response.send(proto::Ack {})?;
1623 Ok(())
1624}
1625
1626async fn update_diagnostic_summary(
1627 message: proto::UpdateDiagnosticSummary,
1628 session: Session,
1629) -> Result<()> {
1630 let guest_connection_ids = session
1631 .db()
1632 .await
1633 .update_diagnostic_summary(&message, session.connection_id)
1634 .await?;
1635
1636 broadcast(
1637 Some(session.connection_id),
1638 guest_connection_ids.iter().copied(),
1639 |connection_id| {
1640 session
1641 .peer
1642 .forward_send(session.connection_id, connection_id, message.clone())
1643 },
1644 );
1645
1646 Ok(())
1647}
1648
1649async fn update_worktree_settings(
1650 message: proto::UpdateWorktreeSettings,
1651 session: Session,
1652) -> Result<()> {
1653 let guest_connection_ids = session
1654 .db()
1655 .await
1656 .update_worktree_settings(&message, session.connection_id)
1657 .await?;
1658
1659 broadcast(
1660 Some(session.connection_id),
1661 guest_connection_ids.iter().copied(),
1662 |connection_id| {
1663 session
1664 .peer
1665 .forward_send(session.connection_id, connection_id, message.clone())
1666 },
1667 );
1668
1669 Ok(())
1670}
1671
1672async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1673 broadcast_project_message(request.project_id, request, session).await
1674}
1675
1676async fn start_language_server(
1677 request: proto::StartLanguageServer,
1678 session: Session,
1679) -> Result<()> {
1680 let guest_connection_ids = session
1681 .db()
1682 .await
1683 .start_language_server(&request, session.connection_id)
1684 .await?;
1685
1686 broadcast(
1687 Some(session.connection_id),
1688 guest_connection_ids.iter().copied(),
1689 |connection_id| {
1690 session
1691 .peer
1692 .forward_send(session.connection_id, connection_id, request.clone())
1693 },
1694 );
1695 Ok(())
1696}
1697
1698async fn update_language_server(
1699 request: proto::UpdateLanguageServer,
1700 session: Session,
1701) -> Result<()> {
1702 session.executor.record_backtrace();
1703 let project_id = ProjectId::from_proto(request.project_id);
1704 let project_connection_ids = session
1705 .db()
1706 .await
1707 .project_connection_ids(project_id, session.connection_id)
1708 .await?;
1709 broadcast(
1710 Some(session.connection_id),
1711 project_connection_ids.iter().copied(),
1712 |connection_id| {
1713 session
1714 .peer
1715 .forward_send(session.connection_id, connection_id, request.clone())
1716 },
1717 );
1718 Ok(())
1719}
1720
1721async fn forward_project_request<T>(
1722 request: T,
1723 response: Response<T>,
1724 session: Session,
1725) -> Result<()>
1726where
1727 T: EntityMessage + RequestMessage,
1728{
1729 session.executor.record_backtrace();
1730 let project_id = ProjectId::from_proto(request.remote_entity_id());
1731 let host_connection_id = {
1732 let collaborators = session
1733 .db()
1734 .await
1735 .project_collaborators(project_id, session.connection_id)
1736 .await?;
1737 collaborators
1738 .iter()
1739 .find(|collaborator| collaborator.is_host)
1740 .ok_or_else(|| anyhow!("host not found"))?
1741 .connection_id
1742 };
1743
1744 let payload = session
1745 .peer
1746 .forward_request(session.connection_id, host_connection_id, request)
1747 .await?;
1748
1749 response.send(payload)?;
1750 Ok(())
1751}
1752
1753async fn create_buffer_for_peer(
1754 request: proto::CreateBufferForPeer,
1755 session: Session,
1756) -> Result<()> {
1757 session.executor.record_backtrace();
1758 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1759 session
1760 .peer
1761 .forward_send(session.connection_id, peer_id.into(), request)?;
1762 Ok(())
1763}
1764
1765async fn update_buffer(
1766 request: proto::UpdateBuffer,
1767 response: Response<proto::UpdateBuffer>,
1768 session: Session,
1769) -> Result<()> {
1770 session.executor.record_backtrace();
1771 let project_id = ProjectId::from_proto(request.project_id);
1772 let mut guest_connection_ids;
1773 let mut host_connection_id = None;
1774 {
1775 let collaborators = session
1776 .db()
1777 .await
1778 .project_collaborators(project_id, session.connection_id)
1779 .await?;
1780 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1781 for collaborator in collaborators.iter() {
1782 if collaborator.is_host {
1783 host_connection_id = Some(collaborator.connection_id);
1784 } else {
1785 guest_connection_ids.push(collaborator.connection_id);
1786 }
1787 }
1788 }
1789 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1790
1791 session.executor.record_backtrace();
1792 broadcast(
1793 Some(session.connection_id),
1794 guest_connection_ids,
1795 |connection_id| {
1796 session
1797 .peer
1798 .forward_send(session.connection_id, connection_id, request.clone())
1799 },
1800 );
1801 if host_connection_id != session.connection_id {
1802 session
1803 .peer
1804 .forward_request(session.connection_id, host_connection_id, request.clone())
1805 .await?;
1806 }
1807
1808 response.send(proto::Ack {})?;
1809 Ok(())
1810}
1811
1812async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1813 let project_id = ProjectId::from_proto(request.project_id);
1814 let project_connection_ids = session
1815 .db()
1816 .await
1817 .project_connection_ids(project_id, session.connection_id)
1818 .await?;
1819
1820 broadcast(
1821 Some(session.connection_id),
1822 project_connection_ids.iter().copied(),
1823 |connection_id| {
1824 session
1825 .peer
1826 .forward_send(session.connection_id, connection_id, request.clone())
1827 },
1828 );
1829 Ok(())
1830}
1831
1832async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1833 let project_id = ProjectId::from_proto(request.project_id);
1834 let project_connection_ids = session
1835 .db()
1836 .await
1837 .project_connection_ids(project_id, session.connection_id)
1838 .await?;
1839 broadcast(
1840 Some(session.connection_id),
1841 project_connection_ids.iter().copied(),
1842 |connection_id| {
1843 session
1844 .peer
1845 .forward_send(session.connection_id, connection_id, request.clone())
1846 },
1847 );
1848 Ok(())
1849}
1850
1851async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1852 broadcast_project_message(request.project_id, request, session).await
1853}
1854
1855async fn broadcast_project_message<T: EnvelopedMessage>(
1856 project_id: u64,
1857 request: T,
1858 session: Session,
1859) -> Result<()> {
1860 let project_id = ProjectId::from_proto(project_id);
1861 let project_connection_ids = session
1862 .db()
1863 .await
1864 .project_connection_ids(project_id, session.connection_id)
1865 .await?;
1866 broadcast(
1867 Some(session.connection_id),
1868 project_connection_ids.iter().copied(),
1869 |connection_id| {
1870 session
1871 .peer
1872 .forward_send(session.connection_id, connection_id, request.clone())
1873 },
1874 );
1875 Ok(())
1876}
1877
1878async fn follow(
1879 request: proto::Follow,
1880 response: Response<proto::Follow>,
1881 session: Session,
1882) -> Result<()> {
1883 let project_id = ProjectId::from_proto(request.project_id);
1884 let leader_id = request
1885 .leader_id
1886 .ok_or_else(|| anyhow!("invalid leader id"))?
1887 .into();
1888 let follower_id = session.connection_id;
1889
1890 {
1891 let project_connection_ids = session
1892 .db()
1893 .await
1894 .project_connection_ids(project_id, session.connection_id)
1895 .await?;
1896
1897 if !project_connection_ids.contains(&leader_id) {
1898 Err(anyhow!("no such peer"))?;
1899 }
1900 }
1901
1902 let mut response_payload = session
1903 .peer
1904 .forward_request(session.connection_id, leader_id, request)
1905 .await?;
1906 response_payload
1907 .views
1908 .retain(|view| view.leader_id != Some(follower_id.into()));
1909 response.send(response_payload)?;
1910
1911 let room = session
1912 .db()
1913 .await
1914 .follow(project_id, leader_id, follower_id)
1915 .await?;
1916 room_updated(&room, &session.peer);
1917
1918 Ok(())
1919}
1920
1921async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1922 let project_id = ProjectId::from_proto(request.project_id);
1923 let leader_id = request
1924 .leader_id
1925 .ok_or_else(|| anyhow!("invalid leader id"))?
1926 .into();
1927 let follower_id = session.connection_id;
1928
1929 if !session
1930 .db()
1931 .await
1932 .project_connection_ids(project_id, session.connection_id)
1933 .await?
1934 .contains(&leader_id)
1935 {
1936 Err(anyhow!("no such peer"))?;
1937 }
1938
1939 session
1940 .peer
1941 .forward_send(session.connection_id, leader_id, request)?;
1942
1943 let room = session
1944 .db()
1945 .await
1946 .unfollow(project_id, leader_id, follower_id)
1947 .await?;
1948 room_updated(&room, &session.peer);
1949
1950 Ok(())
1951}
1952
1953async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1954 let project_id = ProjectId::from_proto(request.project_id);
1955 let project_connection_ids = session
1956 .db
1957 .lock()
1958 .await
1959 .project_connection_ids(project_id, session.connection_id)
1960 .await?;
1961
1962 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1963 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1964 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1965 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1966 });
1967 for follower_peer_id in request.follower_ids.iter().copied() {
1968 let follower_connection_id = follower_peer_id.into();
1969 if project_connection_ids.contains(&follower_connection_id)
1970 && Some(follower_peer_id) != leader_id
1971 {
1972 session.peer.forward_send(
1973 session.connection_id,
1974 follower_connection_id,
1975 request.clone(),
1976 )?;
1977 }
1978 }
1979 Ok(())
1980}
1981
1982async fn get_users(
1983 request: proto::GetUsers,
1984 response: Response<proto::GetUsers>,
1985 session: Session,
1986) -> Result<()> {
1987 let user_ids = request
1988 .user_ids
1989 .into_iter()
1990 .map(UserId::from_proto)
1991 .collect();
1992 let users = session
1993 .db()
1994 .await
1995 .get_users_by_ids(user_ids)
1996 .await?
1997 .into_iter()
1998 .map(|user| proto::User {
1999 id: user.id.to_proto(),
2000 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2001 github_login: user.github_login,
2002 })
2003 .collect();
2004 response.send(proto::UsersResponse { users })?;
2005 Ok(())
2006}
2007
2008async fn fuzzy_search_users(
2009 request: proto::FuzzySearchUsers,
2010 response: Response<proto::FuzzySearchUsers>,
2011 session: Session,
2012) -> Result<()> {
2013 let query = request.query;
2014 let users = match query.len() {
2015 0 => vec![],
2016 1 | 2 => session
2017 .db()
2018 .await
2019 .get_user_by_github_login(&query)
2020 .await?
2021 .into_iter()
2022 .collect(),
2023 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2024 };
2025 let users = users
2026 .into_iter()
2027 .filter(|user| user.id != session.user_id)
2028 .map(|user| proto::User {
2029 id: user.id.to_proto(),
2030 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2031 github_login: user.github_login,
2032 })
2033 .collect();
2034 response.send(proto::UsersResponse { users })?;
2035 Ok(())
2036}
2037
2038async fn request_contact(
2039 request: proto::RequestContact,
2040 response: Response<proto::RequestContact>,
2041 session: Session,
2042) -> Result<()> {
2043 let requester_id = session.user_id;
2044 let responder_id = UserId::from_proto(request.responder_id);
2045 if requester_id == responder_id {
2046 return Err(anyhow!("cannot add yourself as a contact"))?;
2047 }
2048
2049 session
2050 .db()
2051 .await
2052 .send_contact_request(requester_id, responder_id)
2053 .await?;
2054
2055 // Update outgoing contact requests of requester
2056 let mut update = proto::UpdateContacts::default();
2057 update.outgoing_requests.push(responder_id.to_proto());
2058 for connection_id in session
2059 .connection_pool()
2060 .await
2061 .user_connection_ids(requester_id)
2062 {
2063 session.peer.send(connection_id, update.clone())?;
2064 }
2065
2066 // Update incoming contact requests of responder
2067 let mut update = proto::UpdateContacts::default();
2068 update
2069 .incoming_requests
2070 .push(proto::IncomingContactRequest {
2071 requester_id: requester_id.to_proto(),
2072 should_notify: true,
2073 });
2074 for connection_id in session
2075 .connection_pool()
2076 .await
2077 .user_connection_ids(responder_id)
2078 {
2079 session.peer.send(connection_id, update.clone())?;
2080 }
2081
2082 response.send(proto::Ack {})?;
2083 Ok(())
2084}
2085
2086async fn respond_to_contact_request(
2087 request: proto::RespondToContactRequest,
2088 response: Response<proto::RespondToContactRequest>,
2089 session: Session,
2090) -> Result<()> {
2091 let responder_id = session.user_id;
2092 let requester_id = UserId::from_proto(request.requester_id);
2093 let db = session.db().await;
2094 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2095 db.dismiss_contact_notification(responder_id, requester_id)
2096 .await?;
2097 } else {
2098 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2099
2100 db.respond_to_contact_request(responder_id, requester_id, accept)
2101 .await?;
2102 let requester_busy = db.is_user_busy(requester_id).await?;
2103 let responder_busy = db.is_user_busy(responder_id).await?;
2104
2105 let pool = session.connection_pool().await;
2106 // Update responder with new contact
2107 let mut update = proto::UpdateContacts::default();
2108 if accept {
2109 update
2110 .contacts
2111 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2112 }
2113 update
2114 .remove_incoming_requests
2115 .push(requester_id.to_proto());
2116 for connection_id in pool.user_connection_ids(responder_id) {
2117 session.peer.send(connection_id, update.clone())?;
2118 }
2119
2120 // Update requester with new contact
2121 let mut update = proto::UpdateContacts::default();
2122 if accept {
2123 update
2124 .contacts
2125 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2126 }
2127 update
2128 .remove_outgoing_requests
2129 .push(responder_id.to_proto());
2130 for connection_id in pool.user_connection_ids(requester_id) {
2131 session.peer.send(connection_id, update.clone())?;
2132 }
2133 }
2134
2135 response.send(proto::Ack {})?;
2136 Ok(())
2137}
2138
2139async fn remove_contact(
2140 request: proto::RemoveContact,
2141 response: Response<proto::RemoveContact>,
2142 session: Session,
2143) -> Result<()> {
2144 let requester_id = session.user_id;
2145 let responder_id = UserId::from_proto(request.user_id);
2146 let db = session.db().await;
2147 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2148
2149 let pool = session.connection_pool().await;
2150 // Update outgoing contact requests of requester
2151 let mut update = proto::UpdateContacts::default();
2152 if contact_accepted {
2153 update.remove_contacts.push(responder_id.to_proto());
2154 } else {
2155 update
2156 .remove_outgoing_requests
2157 .push(responder_id.to_proto());
2158 }
2159 for connection_id in pool.user_connection_ids(requester_id) {
2160 session.peer.send(connection_id, update.clone())?;
2161 }
2162
2163 // Update incoming contact requests of responder
2164 let mut update = proto::UpdateContacts::default();
2165 if contact_accepted {
2166 update.remove_contacts.push(requester_id.to_proto());
2167 } else {
2168 update
2169 .remove_incoming_requests
2170 .push(requester_id.to_proto());
2171 }
2172 for connection_id in pool.user_connection_ids(responder_id) {
2173 session.peer.send(connection_id, update.clone())?;
2174 }
2175
2176 response.send(proto::Ack {})?;
2177 Ok(())
2178}
2179
2180async fn create_channel(
2181 request: proto::CreateChannel,
2182 response: Response<proto::CreateChannel>,
2183 session: Session,
2184) -> Result<()> {
2185 let db = session.db().await;
2186 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2187
2188 if let Some(live_kit) = session.live_kit_client.as_ref() {
2189 live_kit.create_room(live_kit_room.clone()).await?;
2190 }
2191
2192 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2193 let id = db
2194 .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2195 .await?;
2196
2197 let channel = proto::Channel {
2198 id: id.to_proto(),
2199 name: request.name,
2200 parent_id: request.parent_id,
2201 };
2202
2203 response.send(proto::ChannelResponse {
2204 channel: Some(channel.clone()),
2205 })?;
2206
2207 let mut update = proto::UpdateChannels::default();
2208 update.channels.push(channel);
2209
2210 let user_ids_to_notify = if let Some(parent_id) = parent_id {
2211 db.get_channel_members(parent_id).await?
2212 } else {
2213 vec![session.user_id]
2214 };
2215
2216 let connection_pool = session.connection_pool().await;
2217 for user_id in user_ids_to_notify {
2218 for connection_id in connection_pool.user_connection_ids(user_id) {
2219 let mut update = update.clone();
2220 if user_id == session.user_id {
2221 update.channel_permissions.push(proto::ChannelPermission {
2222 channel_id: id.to_proto(),
2223 is_admin: true,
2224 });
2225 }
2226 session.peer.send(connection_id, update)?;
2227 }
2228 }
2229
2230 Ok(())
2231}
2232
2233async fn remove_channel(
2234 request: proto::RemoveChannel,
2235 response: Response<proto::RemoveChannel>,
2236 session: Session,
2237) -> Result<()> {
2238 let db = session.db().await;
2239
2240 let channel_id = request.channel_id;
2241 let (removed_channels, member_ids) = db
2242 .remove_channel(ChannelId::from_proto(channel_id), session.user_id)
2243 .await?;
2244 response.send(proto::Ack {})?;
2245
2246 // Notify members of removed channels
2247 let mut update = proto::UpdateChannels::default();
2248 update
2249 .remove_channels
2250 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2251
2252 let connection_pool = session.connection_pool().await;
2253 for member_id in member_ids {
2254 for connection_id in connection_pool.user_connection_ids(member_id) {
2255 session.peer.send(connection_id, update.clone())?;
2256 }
2257 }
2258
2259 Ok(())
2260}
2261
2262async fn invite_channel_member(
2263 request: proto::InviteChannelMember,
2264 response: Response<proto::InviteChannelMember>,
2265 session: Session,
2266) -> Result<()> {
2267 let db = session.db().await;
2268 let channel_id = ChannelId::from_proto(request.channel_id);
2269 let invitee_id = UserId::from_proto(request.user_id);
2270 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2271 .await?;
2272
2273 let (channel, _) = db
2274 .get_channel(channel_id, session.user_id)
2275 .await?
2276 .ok_or_else(|| anyhow!("channel not found"))?;
2277
2278 let mut update = proto::UpdateChannels::default();
2279 update.channel_invitations.push(proto::Channel {
2280 id: channel.id.to_proto(),
2281 name: channel.name,
2282 parent_id: None,
2283 });
2284 for connection_id in session
2285 .connection_pool()
2286 .await
2287 .user_connection_ids(invitee_id)
2288 {
2289 session.peer.send(connection_id, update.clone())?;
2290 }
2291
2292 response.send(proto::Ack {})?;
2293 Ok(())
2294}
2295
2296async fn remove_channel_member(
2297 request: proto::RemoveChannelMember,
2298 response: Response<proto::RemoveChannelMember>,
2299 session: Session,
2300) -> Result<()> {
2301 let db = session.db().await;
2302 let channel_id = ChannelId::from_proto(request.channel_id);
2303 let member_id = UserId::from_proto(request.user_id);
2304
2305 db.remove_channel_member(channel_id, member_id, session.user_id)
2306 .await?;
2307
2308 let mut update = proto::UpdateChannels::default();
2309 update.remove_channels.push(channel_id.to_proto());
2310
2311 for connection_id in session
2312 .connection_pool()
2313 .await
2314 .user_connection_ids(member_id)
2315 {
2316 session.peer.send(connection_id, update.clone())?;
2317 }
2318
2319 response.send(proto::Ack {})?;
2320 Ok(())
2321}
2322
2323async fn set_channel_member_admin(
2324 request: proto::SetChannelMemberAdmin,
2325 response: Response<proto::SetChannelMemberAdmin>,
2326 session: Session,
2327) -> Result<()> {
2328 let db = session.db().await;
2329 let channel_id = ChannelId::from_proto(request.channel_id);
2330 let member_id = UserId::from_proto(request.user_id);
2331 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2332 .await?;
2333
2334 let (channel, has_accepted) = db
2335 .get_channel(channel_id, member_id)
2336 .await?
2337 .ok_or_else(|| anyhow!("channel not found"))?;
2338
2339 let mut update = proto::UpdateChannels::default();
2340 if has_accepted {
2341 update.channel_permissions.push(proto::ChannelPermission {
2342 channel_id: channel.id.to_proto(),
2343 is_admin: request.admin,
2344 });
2345 }
2346
2347 for connection_id in session
2348 .connection_pool()
2349 .await
2350 .user_connection_ids(member_id)
2351 {
2352 session.peer.send(connection_id, update.clone())?;
2353 }
2354
2355 response.send(proto::Ack {})?;
2356 Ok(())
2357}
2358
2359async fn rename_channel(
2360 request: proto::RenameChannel,
2361 response: Response<proto::RenameChannel>,
2362 session: Session,
2363) -> Result<()> {
2364 let db = session.db().await;
2365 let channel_id = ChannelId::from_proto(request.channel_id);
2366 let new_name = db
2367 .rename_channel(channel_id, session.user_id, &request.name)
2368 .await?;
2369
2370 let channel = proto::Channel {
2371 id: request.channel_id,
2372 name: new_name,
2373 parent_id: None,
2374 };
2375 response.send(proto::ChannelResponse {
2376 channel: Some(channel.clone()),
2377 })?;
2378 let mut update = proto::UpdateChannels::default();
2379 update.channels.push(channel);
2380
2381 let member_ids = db.get_channel_members(channel_id).await?;
2382
2383 let connection_pool = session.connection_pool().await;
2384 for member_id in member_ids {
2385 for connection_id in connection_pool.user_connection_ids(member_id) {
2386 session.peer.send(connection_id, update.clone())?;
2387 }
2388 }
2389
2390 Ok(())
2391}
2392
2393async fn get_channel_members(
2394 request: proto::GetChannelMembers,
2395 response: Response<proto::GetChannelMembers>,
2396 session: Session,
2397) -> Result<()> {
2398 let db = session.db().await;
2399 let channel_id = ChannelId::from_proto(request.channel_id);
2400 let members = db
2401 .get_channel_member_details(channel_id, session.user_id)
2402 .await?;
2403 response.send(proto::GetChannelMembersResponse { members })?;
2404 Ok(())
2405}
2406
2407async fn respond_to_channel_invite(
2408 request: proto::RespondToChannelInvite,
2409 response: Response<proto::RespondToChannelInvite>,
2410 session: Session,
2411) -> Result<()> {
2412 let db = session.db().await;
2413 let channel_id = ChannelId::from_proto(request.channel_id);
2414 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2415 .await?;
2416
2417 let mut update = proto::UpdateChannels::default();
2418 update
2419 .remove_channel_invitations
2420 .push(channel_id.to_proto());
2421 if request.accept {
2422 let result = db.get_channels_for_user(session.user_id).await?;
2423 update
2424 .channels
2425 .extend(result.channels.into_iter().map(|channel| proto::Channel {
2426 id: channel.id.to_proto(),
2427 name: channel.name,
2428 parent_id: channel.parent_id.map(ChannelId::to_proto),
2429 }));
2430 update
2431 .channel_participants
2432 .extend(
2433 result
2434 .channel_participants
2435 .into_iter()
2436 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2437 channel_id: channel_id.to_proto(),
2438 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2439 }),
2440 );
2441 update
2442 .channel_permissions
2443 .extend(
2444 result
2445 .channels_with_admin_privileges
2446 .into_iter()
2447 .map(|channel_id| proto::ChannelPermission {
2448 channel_id: channel_id.to_proto(),
2449 is_admin: true,
2450 }),
2451 );
2452 }
2453 session.peer.send(session.connection_id, update)?;
2454 response.send(proto::Ack {})?;
2455
2456 Ok(())
2457}
2458
2459async fn join_channel(
2460 request: proto::JoinChannel,
2461 response: Response<proto::JoinChannel>,
2462 session: Session,
2463) -> Result<()> {
2464 let channel_id = ChannelId::from_proto(request.channel_id);
2465
2466 let joined_room = {
2467 leave_room_for_session(&session).await?;
2468 let db = session.db().await;
2469
2470 let room_id = db.room_id_for_channel(channel_id).await?;
2471
2472 let joined_room = db
2473 .join_room(room_id, session.user_id, session.connection_id)
2474 .await?;
2475
2476 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2477 let token = live_kit
2478 .room_token(
2479 &joined_room.room.live_kit_room,
2480 &session.user_id.to_string(),
2481 )
2482 .trace_err()?;
2483
2484 Some(LiveKitConnectionInfo {
2485 server_url: live_kit.url().into(),
2486 token,
2487 })
2488 });
2489
2490 response.send(proto::JoinRoomResponse {
2491 room: Some(joined_room.room.clone()),
2492 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2493 live_kit_connection_info,
2494 })?;
2495
2496 room_updated(&joined_room.room, &session.peer);
2497
2498 joined_room.into_inner()
2499 };
2500
2501 channel_updated(
2502 channel_id,
2503 &joined_room.room,
2504 &joined_room.channel_members,
2505 &session.peer,
2506 &*session.connection_pool().await,
2507 );
2508
2509 update_user_contacts(session.user_id, &session).await?;
2510
2511 Ok(())
2512}
2513
2514async fn join_channel_buffer(
2515 request: proto::JoinChannelBuffer,
2516 response: Response<proto::JoinChannelBuffer>,
2517 session: Session,
2518) -> Result<()> {
2519 let db = session.db().await;
2520 let channel_id = ChannelId::from_proto(request.channel_id);
2521
2522 let open_response = db
2523 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2524 .await?;
2525
2526 let replica_id = open_response.replica_id;
2527 let collaborators = open_response.collaborators.clone();
2528
2529 response.send(open_response)?;
2530
2531 let update = AddChannelBufferCollaborator {
2532 channel_id: channel_id.to_proto(),
2533 collaborator: Some(proto::Collaborator {
2534 user_id: session.user_id.to_proto(),
2535 peer_id: Some(session.connection_id.into()),
2536 replica_id,
2537 }),
2538 };
2539 channel_buffer_updated(
2540 session.connection_id,
2541 collaborators
2542 .iter()
2543 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2544 &update,
2545 &session.peer,
2546 );
2547
2548 Ok(())
2549}
2550
2551async fn update_channel_buffer(
2552 request: proto::UpdateChannelBuffer,
2553 session: Session,
2554) -> Result<()> {
2555 let db = session.db().await;
2556 let channel_id = ChannelId::from_proto(request.channel_id);
2557
2558 let collaborators = db
2559 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2560 .await?;
2561
2562 channel_buffer_updated(
2563 session.connection_id,
2564 collaborators,
2565 &proto::UpdateChannelBuffer {
2566 channel_id: channel_id.to_proto(),
2567 operations: request.operations,
2568 },
2569 &session.peer,
2570 );
2571 Ok(())
2572}
2573
2574async fn rejoin_channel_buffers(
2575 request: proto::RejoinChannelBuffers,
2576 response: Response<proto::RejoinChannelBuffers>,
2577 session: Session,
2578) -> Result<()> {
2579 let db = session.db().await;
2580 let buffers = db
2581 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2582 .await?;
2583
2584 for buffer in &buffers {
2585 let collaborators_to_notify = buffer
2586 .buffer
2587 .collaborators
2588 .iter()
2589 .filter_map(|c| Some(c.peer_id?.into()));
2590 channel_buffer_updated(
2591 session.connection_id,
2592 collaborators_to_notify,
2593 &proto::UpdateChannelBufferCollaborator {
2594 channel_id: buffer.buffer.channel_id,
2595 old_peer_id: Some(buffer.old_connection_id.into()),
2596 new_peer_id: Some(session.connection_id.into()),
2597 },
2598 &session.peer,
2599 );
2600 }
2601
2602 response.send(proto::RejoinChannelBuffersResponse {
2603 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2604 })?;
2605
2606 Ok(())
2607}
2608
2609async fn leave_channel_buffer(
2610 request: proto::LeaveChannelBuffer,
2611 response: Response<proto::LeaveChannelBuffer>,
2612 session: Session,
2613) -> Result<()> {
2614 let db = session.db().await;
2615 let channel_id = ChannelId::from_proto(request.channel_id);
2616
2617 let collaborators_to_notify = db
2618 .leave_channel_buffer(channel_id, session.connection_id)
2619 .await?;
2620
2621 response.send(Ack {})?;
2622
2623 channel_buffer_updated(
2624 session.connection_id,
2625 collaborators_to_notify,
2626 &proto::RemoveChannelBufferCollaborator {
2627 channel_id: channel_id.to_proto(),
2628 peer_id: Some(session.connection_id.into()),
2629 },
2630 &session.peer,
2631 );
2632
2633 Ok(())
2634}
2635
2636fn channel_buffer_updated<T: EnvelopedMessage>(
2637 sender_id: ConnectionId,
2638 collaborators: impl IntoIterator<Item = ConnectionId>,
2639 message: &T,
2640 peer: &Peer,
2641) {
2642 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2643 peer.send(peer_id.into(), message.clone())
2644 });
2645}
2646
2647async fn send_channel_message(
2648 request: proto::SendChannelMessage,
2649 response: Response<proto::SendChannelMessage>,
2650 session: Session,
2651) -> Result<()> {
2652 // Validate the message body.
2653 let body = request.body.trim().to_string();
2654 if body.len() > MAX_MESSAGE_LEN {
2655 return Err(anyhow!("message is too long"))?;
2656 }
2657 if body.is_empty() {
2658 return Err(anyhow!("message can't be blank"))?;
2659 }
2660
2661 let timestamp = OffsetDateTime::now_utc();
2662 let nonce = request
2663 .nonce
2664 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2665
2666 let channel_id = ChannelId::from_proto(request.channel_id);
2667 let (message_id, connection_ids) = session
2668 .db()
2669 .await
2670 .create_channel_message(
2671 channel_id,
2672 session.user_id,
2673 &body,
2674 timestamp,
2675 nonce.clone().into(),
2676 )
2677 .await?;
2678 let message = proto::ChannelMessage {
2679 sender_id: session.user_id.to_proto(),
2680 id: message_id.to_proto(),
2681 body,
2682 timestamp: timestamp.unix_timestamp() as u64,
2683 nonce: Some(nonce),
2684 };
2685 broadcast(Some(session.connection_id), connection_ids, |connection| {
2686 session.peer.send(
2687 connection,
2688 proto::ChannelMessageSent {
2689 channel_id: channel_id.to_proto(),
2690 message: Some(message.clone()),
2691 },
2692 )
2693 });
2694 response.send(proto::SendChannelMessageResponse {
2695 message: Some(message),
2696 })?;
2697 Ok(())
2698}
2699
2700async fn remove_channel_message(
2701 request: proto::RemoveChannelMessage,
2702 response: Response<proto::RemoveChannelMessage>,
2703 session: Session,
2704) -> Result<()> {
2705 let channel_id = ChannelId::from_proto(request.channel_id);
2706 let message_id = MessageId::from_proto(request.message_id);
2707 let connection_ids = session
2708 .db()
2709 .await
2710 .remove_channel_message(channel_id, message_id, session.user_id)
2711 .await?;
2712 broadcast(Some(session.connection_id), connection_ids, |connection| {
2713 session.peer.send(connection, request.clone())
2714 });
2715 response.send(proto::Ack {})?;
2716 Ok(())
2717}
2718
2719async fn join_channel_chat(
2720 request: proto::JoinChannelChat,
2721 response: Response<proto::JoinChannelChat>,
2722 session: Session,
2723) -> Result<()> {
2724 let channel_id = ChannelId::from_proto(request.channel_id);
2725
2726 let db = session.db().await;
2727 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2728 .await?;
2729 let messages = db
2730 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2731 .await?;
2732 response.send(proto::JoinChannelChatResponse {
2733 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2734 messages,
2735 })?;
2736 Ok(())
2737}
2738
2739async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2740 let channel_id = ChannelId::from_proto(request.channel_id);
2741 session
2742 .db()
2743 .await
2744 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2745 .await?;
2746 Ok(())
2747}
2748
2749async fn get_channel_messages(
2750 request: proto::GetChannelMessages,
2751 response: Response<proto::GetChannelMessages>,
2752 session: Session,
2753) -> Result<()> {
2754 let channel_id = ChannelId::from_proto(request.channel_id);
2755 let messages = session
2756 .db()
2757 .await
2758 .get_channel_messages(
2759 channel_id,
2760 session.user_id,
2761 MESSAGE_COUNT_PER_PAGE,
2762 Some(MessageId::from_proto(request.before_message_id)),
2763 )
2764 .await?;
2765 response.send(proto::GetChannelMessagesResponse {
2766 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2767 messages,
2768 })?;
2769 Ok(())
2770}
2771
2772async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2773 let project_id = ProjectId::from_proto(request.project_id);
2774 let project_connection_ids = session
2775 .db()
2776 .await
2777 .project_connection_ids(project_id, session.connection_id)
2778 .await?;
2779 broadcast(
2780 Some(session.connection_id),
2781 project_connection_ids.iter().copied(),
2782 |connection_id| {
2783 session
2784 .peer
2785 .forward_send(session.connection_id, connection_id, request.clone())
2786 },
2787 );
2788 Ok(())
2789}
2790
2791async fn get_private_user_info(
2792 _request: proto::GetPrivateUserInfo,
2793 response: Response<proto::GetPrivateUserInfo>,
2794 session: Session,
2795) -> Result<()> {
2796 let db = session.db().await;
2797
2798 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2799 let user = db
2800 .get_user_by_id(session.user_id)
2801 .await?
2802 .ok_or_else(|| anyhow!("user not found"))?;
2803 let flags = db.get_user_flags(session.user_id).await?;
2804
2805 response.send(proto::GetPrivateUserInfoResponse {
2806 metrics_id,
2807 staff: user.admin,
2808 flags,
2809 })?;
2810 Ok(())
2811}
2812
2813fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2814 match message {
2815 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2816 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2817 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2818 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2819 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2820 code: frame.code.into(),
2821 reason: frame.reason,
2822 })),
2823 }
2824}
2825
2826fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2827 match message {
2828 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2829 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2830 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2831 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2832 AxumMessage::Close(frame) => {
2833 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2834 code: frame.code.into(),
2835 reason: frame.reason,
2836 }))
2837 }
2838 }
2839}
2840
2841fn build_initial_channels_update(
2842 channels: ChannelsForUser,
2843 channel_invites: Vec<db::Channel>,
2844) -> proto::UpdateChannels {
2845 let mut update = proto::UpdateChannels::default();
2846
2847 for channel in channels.channels {
2848 update.channels.push(proto::Channel {
2849 id: channel.id.to_proto(),
2850 name: channel.name,
2851 parent_id: channel.parent_id.map(|id| id.to_proto()),
2852 });
2853 }
2854
2855 for (channel_id, participants) in channels.channel_participants {
2856 update
2857 .channel_participants
2858 .push(proto::ChannelParticipants {
2859 channel_id: channel_id.to_proto(),
2860 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2861 });
2862 }
2863
2864 update
2865 .channel_permissions
2866 .extend(
2867 channels
2868 .channels_with_admin_privileges
2869 .into_iter()
2870 .map(|id| proto::ChannelPermission {
2871 channel_id: id.to_proto(),
2872 is_admin: true,
2873 }),
2874 );
2875
2876 for channel in channel_invites {
2877 update.channel_invitations.push(proto::Channel {
2878 id: channel.id.to_proto(),
2879 name: channel.name,
2880 parent_id: None,
2881 });
2882 }
2883
2884 update
2885}
2886
2887fn build_initial_contacts_update(
2888 contacts: Vec<db::Contact>,
2889 pool: &ConnectionPool,
2890) -> proto::UpdateContacts {
2891 let mut update = proto::UpdateContacts::default();
2892
2893 for contact in contacts {
2894 match contact {
2895 db::Contact::Accepted {
2896 user_id,
2897 should_notify,
2898 busy,
2899 } => {
2900 update
2901 .contacts
2902 .push(contact_for_user(user_id, should_notify, busy, &pool));
2903 }
2904 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2905 db::Contact::Incoming {
2906 user_id,
2907 should_notify,
2908 } => update
2909 .incoming_requests
2910 .push(proto::IncomingContactRequest {
2911 requester_id: user_id.to_proto(),
2912 should_notify,
2913 }),
2914 }
2915 }
2916
2917 update
2918}
2919
2920fn contact_for_user(
2921 user_id: UserId,
2922 should_notify: bool,
2923 busy: bool,
2924 pool: &ConnectionPool,
2925) -> proto::Contact {
2926 proto::Contact {
2927 user_id: user_id.to_proto(),
2928 online: pool.is_user_online(user_id),
2929 busy,
2930 should_notify,
2931 }
2932}
2933
2934fn room_updated(room: &proto::Room, peer: &Peer) {
2935 broadcast(
2936 None,
2937 room.participants
2938 .iter()
2939 .filter_map(|participant| Some(participant.peer_id?.into())),
2940 |peer_id| {
2941 peer.send(
2942 peer_id.into(),
2943 proto::RoomUpdated {
2944 room: Some(room.clone()),
2945 },
2946 )
2947 },
2948 );
2949}
2950
2951fn channel_updated(
2952 channel_id: ChannelId,
2953 room: &proto::Room,
2954 channel_members: &[UserId],
2955 peer: &Peer,
2956 pool: &ConnectionPool,
2957) {
2958 let participants = room
2959 .participants
2960 .iter()
2961 .map(|p| p.user_id)
2962 .collect::<Vec<_>>();
2963
2964 broadcast(
2965 None,
2966 channel_members
2967 .iter()
2968 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2969 |peer_id| {
2970 peer.send(
2971 peer_id.into(),
2972 proto::UpdateChannels {
2973 channel_participants: vec![proto::ChannelParticipants {
2974 channel_id: channel_id.to_proto(),
2975 participant_user_ids: participants.clone(),
2976 }],
2977 ..Default::default()
2978 },
2979 )
2980 },
2981 );
2982}
2983
2984async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2985 let db = session.db().await;
2986
2987 let contacts = db.get_contacts(user_id).await?;
2988 let busy = db.is_user_busy(user_id).await?;
2989
2990 let pool = session.connection_pool().await;
2991 let updated_contact = contact_for_user(user_id, false, busy, &pool);
2992 for contact in contacts {
2993 if let db::Contact::Accepted {
2994 user_id: contact_user_id,
2995 ..
2996 } = contact
2997 {
2998 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2999 session
3000 .peer
3001 .send(
3002 contact_conn_id,
3003 proto::UpdateContacts {
3004 contacts: vec![updated_contact.clone()],
3005 remove_contacts: Default::default(),
3006 incoming_requests: Default::default(),
3007 remove_incoming_requests: Default::default(),
3008 outgoing_requests: Default::default(),
3009 remove_outgoing_requests: Default::default(),
3010 },
3011 )
3012 .trace_err();
3013 }
3014 }
3015 }
3016 Ok(())
3017}
3018
3019async fn leave_room_for_session(session: &Session) -> Result<()> {
3020 let mut contacts_to_update = HashSet::default();
3021
3022 let room_id;
3023 let canceled_calls_to_user_ids;
3024 let live_kit_room;
3025 let delete_live_kit_room;
3026 let room;
3027 let channel_members;
3028 let channel_id;
3029
3030 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3031 contacts_to_update.insert(session.user_id);
3032
3033 for project in left_room.left_projects.values() {
3034 project_left(project, session);
3035 }
3036
3037 room_id = RoomId::from_proto(left_room.room.id);
3038 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3039 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3040 delete_live_kit_room = left_room.deleted;
3041 room = mem::take(&mut left_room.room);
3042 channel_members = mem::take(&mut left_room.channel_members);
3043 channel_id = left_room.channel_id;
3044
3045 room_updated(&room, &session.peer);
3046 } else {
3047 return Ok(());
3048 }
3049
3050 if let Some(channel_id) = channel_id {
3051 channel_updated(
3052 channel_id,
3053 &room,
3054 &channel_members,
3055 &session.peer,
3056 &*session.connection_pool().await,
3057 );
3058 }
3059
3060 {
3061 let pool = session.connection_pool().await;
3062 for canceled_user_id in canceled_calls_to_user_ids {
3063 for connection_id in pool.user_connection_ids(canceled_user_id) {
3064 session
3065 .peer
3066 .send(
3067 connection_id,
3068 proto::CallCanceled {
3069 room_id: room_id.to_proto(),
3070 },
3071 )
3072 .trace_err();
3073 }
3074 contacts_to_update.insert(canceled_user_id);
3075 }
3076 }
3077
3078 for contact_user_id in contacts_to_update {
3079 update_user_contacts(contact_user_id, &session).await?;
3080 }
3081
3082 if let Some(live_kit) = session.live_kit_client.as_ref() {
3083 live_kit
3084 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3085 .await
3086 .trace_err();
3087
3088 if delete_live_kit_room {
3089 live_kit.delete_room(live_kit_room).await.trace_err();
3090 }
3091 }
3092
3093 Ok(())
3094}
3095
3096async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3097 let left_channel_buffers = session
3098 .db()
3099 .await
3100 .leave_channel_buffers(session.connection_id)
3101 .await?;
3102
3103 for (channel_id, connections) in left_channel_buffers {
3104 channel_buffer_updated(
3105 session.connection_id,
3106 connections,
3107 &proto::RemoveChannelBufferCollaborator {
3108 channel_id: channel_id.to_proto(),
3109 peer_id: Some(session.connection_id.into()),
3110 },
3111 &session.peer,
3112 );
3113 }
3114
3115 Ok(())
3116}
3117
3118fn project_left(project: &db::LeftProject, session: &Session) {
3119 for connection_id in &project.connection_ids {
3120 if project.host_user_id == session.user_id {
3121 session
3122 .peer
3123 .send(
3124 *connection_id,
3125 proto::UnshareProject {
3126 project_id: project.id.to_proto(),
3127 },
3128 )
3129 .trace_err();
3130 } else {
3131 session
3132 .peer
3133 .send(
3134 *connection_id,
3135 proto::RemoveProjectCollaborator {
3136 project_id: project.id.to_proto(),
3137 peer_id: Some(session.connection_id.into()),
3138 },
3139 )
3140 .trace_err();
3141 }
3142 }
3143}
3144
3145pub trait ResultExt {
3146 type Ok;
3147
3148 fn trace_err(self) -> Option<Self::Ok>;
3149}
3150
3151impl<T, E> ResultExt for Result<T, E>
3152where
3153 E: std::fmt::Debug,
3154{
3155 type Ok = T;
3156
3157 fn trace_err(self) -> Option<T> {
3158 match self {
3159 Ok(value) => Some(value),
3160 Err(error) => {
3161 tracing::error!("{:?}", error);
3162 None
3163 }
3164 }
3165 }
3166}