1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
6 executor::Executor,
7 AppState, Result,
8};
9use anyhow::anyhow;
10use async_tungstenite::tungstenite::{
11 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
12};
13use axum::{
14 body::Body,
15 extract::{
16 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
17 ConnectInfo, WebSocketUpgrade,
18 },
19 headers::{Header, HeaderName},
20 http::StatusCode,
21 middleware,
22 response::IntoResponse,
23 routing::get,
24 Extension, Router, TypedHeader,
25};
26use collections::{HashMap, HashSet};
27pub use connection_pool::ConnectionPool;
28use futures::{
29 channel::oneshot,
30 future::{self, BoxFuture},
31 stream::FuturesUnordered,
32 FutureExt, SinkExt, StreamExt, TryStreamExt,
33};
34use lazy_static::lazy_static;
35use prometheus::{register_int_gauge, IntGauge};
36use rpc::{
37 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
38 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
39};
40use serde::{Serialize, Serializer};
41use std::{
42 any::TypeId,
43 fmt,
44 future::Future,
45 marker::PhantomData,
46 mem,
47 net::SocketAddr,
48 ops::{Deref, DerefMut},
49 rc::Rc,
50 sync::{
51 atomic::{AtomicBool, Ordering::SeqCst},
52 Arc,
53 },
54 time::{Duration, Instant},
55};
56use tokio::sync::{watch, Semaphore};
57use tower::ServiceBuilder;
58use tracing::{info_span, instrument, Instrument};
59
60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
62
63lazy_static! {
64 static ref METRIC_CONNECTIONS: IntGauge =
65 register_int_gauge!("connections", "number of connections").unwrap();
66 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
67 "shared_projects",
68 "number of open projects with one or more guests"
69 )
70 .unwrap();
71}
72
73type MessageHandler =
74 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
75
76struct Response<R> {
77 peer: Arc<Peer>,
78 receipt: Receipt<R>,
79 responded: Arc<AtomicBool>,
80}
81
82impl<R: RequestMessage> Response<R> {
83 fn send(self, payload: R::Response) -> Result<()> {
84 self.responded.store(true, SeqCst);
85 self.peer.respond(self.receipt, payload)?;
86 Ok(())
87 }
88}
89
90#[derive(Clone)]
91struct Session {
92 user_id: UserId,
93 connection_id: ConnectionId,
94 db: Arc<tokio::sync::Mutex<DbHandle>>,
95 peer: Arc<Peer>,
96 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
97 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
98 executor: Executor,
99}
100
101impl Session {
102 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
103 #[cfg(test)]
104 tokio::task::yield_now().await;
105 let guard = self.db.lock().await;
106 #[cfg(test)]
107 tokio::task::yield_now().await;
108 guard
109 }
110
111 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
112 #[cfg(test)]
113 tokio::task::yield_now().await;
114 let guard = self.connection_pool.lock();
115 ConnectionPoolGuard {
116 guard,
117 _not_send: PhantomData,
118 }
119 }
120}
121
122impl fmt::Debug for Session {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.debug_struct("Session")
125 .field("user_id", &self.user_id)
126 .field("connection_id", &self.connection_id)
127 .finish()
128 }
129}
130
131struct DbHandle(Arc<Database>);
132
133impl Deref for DbHandle {
134 type Target = Database;
135
136 fn deref(&self) -> &Self::Target {
137 self.0.as_ref()
138 }
139}
140
141pub struct Server {
142 id: parking_lot::Mutex<ServerId>,
143 peer: Arc<Peer>,
144 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
145 app_state: Arc<AppState>,
146 executor: Executor,
147 handlers: HashMap<TypeId, MessageHandler>,
148 teardown: watch::Sender<()>,
149}
150
151pub(crate) struct ConnectionPoolGuard<'a> {
152 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
153 _not_send: PhantomData<Rc<()>>,
154}
155
156#[derive(Serialize)]
157pub struct ServerSnapshot<'a> {
158 peer: &'a Peer,
159 #[serde(serialize_with = "serialize_deref")]
160 connection_pool: ConnectionPoolGuard<'a>,
161}
162
163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
164where
165 S: Serializer,
166 T: Deref<Target = U>,
167 U: Serialize,
168{
169 Serialize::serialize(value.deref(), serializer)
170}
171
172impl Server {
173 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
174 let mut server = Self {
175 id: parking_lot::Mutex::new(id),
176 peer: Peer::new(id.0 as u32),
177 app_state,
178 executor,
179 connection_pool: Default::default(),
180 handlers: Default::default(),
181 teardown: watch::channel(()).0,
182 };
183
184 server
185 .add_request_handler(ping)
186 .add_request_handler(create_room)
187 .add_request_handler(join_room)
188 .add_request_handler(rejoin_room)
189 .add_request_handler(leave_room)
190 .add_request_handler(call)
191 .add_request_handler(cancel_call)
192 .add_message_handler(decline_call)
193 .add_request_handler(update_participant_location)
194 .add_request_handler(share_project)
195 .add_message_handler(unshare_project)
196 .add_request_handler(join_project)
197 .add_message_handler(leave_project)
198 .add_request_handler(update_project)
199 .add_request_handler(update_worktree)
200 .add_message_handler(start_language_server)
201 .add_message_handler(update_language_server)
202 .add_message_handler(update_diagnostic_summary)
203 .add_message_handler(update_worktree_settings)
204 .add_request_handler(forward_project_request::<proto::GetHover>)
205 .add_request_handler(forward_project_request::<proto::GetDefinition>)
206 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
207 .add_request_handler(forward_project_request::<proto::GetReferences>)
208 .add_request_handler(forward_project_request::<proto::SearchProject>)
209 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
210 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
211 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
212 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
213 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
214 .add_request_handler(forward_project_request::<proto::GetCompletions>)
215 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
216 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
217 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
218 .add_request_handler(forward_project_request::<proto::PrepareRename>)
219 .add_request_handler(forward_project_request::<proto::PerformRename>)
220 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
221 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
222 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
223 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
224 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
225 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
226 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
227 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
228 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
229 .add_request_handler(forward_project_request::<proto::InlayHints>)
230 .add_message_handler(create_buffer_for_peer)
231 .add_request_handler(update_buffer)
232 .add_message_handler(update_buffer_file)
233 .add_message_handler(buffer_reloaded)
234 .add_message_handler(buffer_saved)
235 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
236 .add_request_handler(get_users)
237 .add_request_handler(fuzzy_search_users)
238 .add_request_handler(request_contact)
239 .add_request_handler(remove_contact)
240 .add_request_handler(respond_to_contact_request)
241 .add_request_handler(follow)
242 .add_message_handler(unfollow)
243 .add_message_handler(update_followers)
244 .add_message_handler(update_diff_base)
245 .add_request_handler(get_private_user_info);
246
247 Arc::new(server)
248 }
249
250 pub async fn start(&self) -> Result<()> {
251 let server_id = *self.id.lock();
252 let app_state = self.app_state.clone();
253 let peer = self.peer.clone();
254 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
255 let pool = self.connection_pool.clone();
256 let live_kit_client = self.app_state.live_kit_client.clone();
257
258 let span = info_span!("start server");
259 self.executor.spawn_detached(
260 async move {
261 tracing::info!("waiting for cleanup timeout");
262 timeout.await;
263 tracing::info!("cleanup timeout expired, retrieving stale rooms");
264 if let Some(room_ids) = app_state
265 .db
266 .stale_room_ids(&app_state.config.zed_environment, server_id)
267 .await
268 .trace_err()
269 {
270 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
271 for room_id in room_ids {
272 let mut contacts_to_update = HashSet::default();
273 let mut canceled_calls_to_user_ids = Vec::new();
274 let mut live_kit_room = String::new();
275 let mut delete_live_kit_room = false;
276
277 if let Some(mut refreshed_room) = app_state
278 .db
279 .refresh_room(room_id, server_id)
280 .await
281 .trace_err()
282 {
283 tracing::info!(
284 room_id = room_id.0,
285 new_participant_count = refreshed_room.room.participants.len(),
286 "refreshed room"
287 );
288 room_updated(&refreshed_room.room, &peer);
289 contacts_to_update
290 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
291 contacts_to_update
292 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
293 canceled_calls_to_user_ids =
294 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
295 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
296 delete_live_kit_room = refreshed_room.room.participants.is_empty();
297 }
298
299 {
300 let pool = pool.lock();
301 for canceled_user_id in canceled_calls_to_user_ids {
302 for connection_id in pool.user_connection_ids(canceled_user_id) {
303 peer.send(
304 connection_id,
305 proto::CallCanceled {
306 room_id: room_id.to_proto(),
307 },
308 )
309 .trace_err();
310 }
311 }
312 }
313
314 for user_id in contacts_to_update {
315 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
316 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
317 if let Some((busy, contacts)) = busy.zip(contacts) {
318 let pool = pool.lock();
319 let updated_contact = contact_for_user(user_id, false, busy, &pool);
320 for contact in contacts {
321 if let db::Contact::Accepted {
322 user_id: contact_user_id,
323 ..
324 } = contact
325 {
326 for contact_conn_id in
327 pool.user_connection_ids(contact_user_id)
328 {
329 peer.send(
330 contact_conn_id,
331 proto::UpdateContacts {
332 contacts: vec![updated_contact.clone()],
333 remove_contacts: Default::default(),
334 incoming_requests: Default::default(),
335 remove_incoming_requests: Default::default(),
336 outgoing_requests: Default::default(),
337 remove_outgoing_requests: Default::default(),
338 },
339 )
340 .trace_err();
341 }
342 }
343 }
344 }
345 }
346
347 if let Some(live_kit) = live_kit_client.as_ref() {
348 if delete_live_kit_room {
349 live_kit.delete_room(live_kit_room).await.trace_err();
350 }
351 }
352 }
353 }
354
355 app_state
356 .db
357 .delete_stale_servers(&app_state.config.zed_environment, server_id)
358 .await
359 .trace_err();
360 }
361 .instrument(span),
362 );
363 Ok(())
364 }
365
366 pub fn teardown(&self) {
367 self.peer.teardown();
368 self.connection_pool.lock().reset();
369 let _ = self.teardown.send(());
370 }
371
372 #[cfg(test)]
373 pub fn reset(&self, id: ServerId) {
374 self.teardown();
375 *self.id.lock() = id;
376 self.peer.reset(id.0 as u32);
377 }
378
379 #[cfg(test)]
380 pub fn id(&self) -> ServerId {
381 *self.id.lock()
382 }
383
384 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
385 where
386 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
387 Fut: 'static + Send + Future<Output = Result<()>>,
388 M: EnvelopedMessage,
389 {
390 let prev_handler = self.handlers.insert(
391 TypeId::of::<M>(),
392 Box::new(move |envelope, session| {
393 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
394 let span = info_span!(
395 "handle message",
396 payload_type = envelope.payload_type_name()
397 );
398 span.in_scope(|| {
399 tracing::info!(
400 payload_type = envelope.payload_type_name(),
401 "message received"
402 );
403 });
404 let start_time = Instant::now();
405 let future = (handler)(*envelope, session);
406 async move {
407 let result = future.await;
408 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
409 match result {
410 Err(error) => {
411 tracing::error!(%error, ?duration_ms, "error handling message")
412 }
413 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
414 }
415 }
416 .instrument(span)
417 .boxed()
418 }),
419 );
420 if prev_handler.is_some() {
421 panic!("registered a handler for the same message twice");
422 }
423 self
424 }
425
426 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
427 where
428 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
429 Fut: 'static + Send + Future<Output = Result<()>>,
430 M: EnvelopedMessage,
431 {
432 self.add_handler(move |envelope, session| handler(envelope.payload, session));
433 self
434 }
435
436 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
437 where
438 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
439 Fut: Send + Future<Output = Result<()>>,
440 M: RequestMessage,
441 {
442 let handler = Arc::new(handler);
443 self.add_handler(move |envelope, session| {
444 let receipt = envelope.receipt();
445 let handler = handler.clone();
446 async move {
447 let peer = session.peer.clone();
448 let responded = Arc::new(AtomicBool::default());
449 let response = Response {
450 peer: peer.clone(),
451 responded: responded.clone(),
452 receipt,
453 };
454 match (handler)(envelope.payload, response, session).await {
455 Ok(()) => {
456 if responded.load(std::sync::atomic::Ordering::SeqCst) {
457 Ok(())
458 } else {
459 Err(anyhow!("handler did not send a response"))?
460 }
461 }
462 Err(error) => {
463 peer.respond_with_error(
464 receipt,
465 proto::Error {
466 message: error.to_string(),
467 },
468 )?;
469 Err(error)
470 }
471 }
472 }
473 })
474 }
475
476 pub fn handle_connection(
477 self: &Arc<Self>,
478 connection: Connection,
479 address: String,
480 user: User,
481 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
482 executor: Executor,
483 ) -> impl Future<Output = Result<()>> {
484 let this = self.clone();
485 let user_id = user.id;
486 let login = user.github_login;
487 let span = info_span!("handle connection", %user_id, %login, %address);
488 let mut teardown = self.teardown.subscribe();
489 async move {
490 let (connection_id, handle_io, mut incoming_rx) = this
491 .peer
492 .add_connection(connection, {
493 let executor = executor.clone();
494 move |duration| executor.sleep(duration)
495 });
496
497 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
498 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
499 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
500
501 if let Some(send_connection_id) = send_connection_id.take() {
502 let _ = send_connection_id.send(connection_id);
503 }
504
505 if !user.connected_once {
506 this.peer.send(connection_id, proto::ShowContacts {})?;
507 this.app_state.db.set_user_connected_once(user_id, true).await?;
508 }
509
510 let (contacts, invite_code) = future::try_join(
511 this.app_state.db.get_contacts(user_id),
512 this.app_state.db.get_invite_code_for_user(user_id)
513 ).await?;
514
515 {
516 let mut pool = this.connection_pool.lock();
517 pool.add_connection(connection_id, user_id, user.admin);
518 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
519
520 if let Some((code, count)) = invite_code {
521 this.peer.send(connection_id, proto::UpdateInviteInfo {
522 url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
523 count: count as u32,
524 })?;
525 }
526 }
527
528 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
529 this.peer.send(connection_id, incoming_call)?;
530 }
531
532 let session = Session {
533 user_id,
534 connection_id,
535 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
536 peer: this.peer.clone(),
537 connection_pool: this.connection_pool.clone(),
538 live_kit_client: this.app_state.live_kit_client.clone(),
539 executor: executor.clone(),
540 };
541 update_user_contacts(user_id, &session).await?;
542
543 let handle_io = handle_io.fuse();
544 futures::pin_mut!(handle_io);
545
546 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
547 // This prevents deadlocks when e.g., client A performs a request to client B and
548 // client B performs a request to client A. If both clients stop processing further
549 // messages until their respective request completes, they won't have a chance to
550 // respond to the other client's request and cause a deadlock.
551 //
552 // This arrangement ensures we will attempt to process earlier messages first, but fall
553 // back to processing messages arrived later in the spirit of making progress.
554 let mut foreground_message_handlers = FuturesUnordered::new();
555 let concurrent_handlers = Arc::new(Semaphore::new(256));
556 loop {
557 let next_message = async {
558 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
559 let message = incoming_rx.next().await;
560 (permit, message)
561 }.fuse();
562 futures::pin_mut!(next_message);
563 futures::select_biased! {
564 _ = teardown.changed().fuse() => return Ok(()),
565 result = handle_io => {
566 if let Err(error) = result {
567 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
568 }
569 break;
570 }
571 _ = foreground_message_handlers.next() => {}
572 next_message = next_message => {
573 let (permit, message) = next_message;
574 if let Some(message) = message {
575 let type_name = message.payload_type_name();
576 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
577 let span_enter = span.enter();
578 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
579 let is_background = message.is_background();
580 let handle_message = (handler)(message, session.clone());
581 drop(span_enter);
582
583 let handle_message = async move {
584 handle_message.await;
585 drop(permit);
586 }.instrument(span);
587 if is_background {
588 executor.spawn_detached(handle_message);
589 } else {
590 foreground_message_handlers.push(handle_message);
591 }
592 } else {
593 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
594 }
595 } else {
596 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
597 break;
598 }
599 }
600 }
601 }
602
603 drop(foreground_message_handlers);
604 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
605 if let Err(error) = connection_lost(session, teardown, executor).await {
606 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
607 }
608
609 Ok(())
610 }.instrument(span)
611 }
612
613 pub async fn invite_code_redeemed(
614 self: &Arc<Self>,
615 inviter_id: UserId,
616 invitee_id: UserId,
617 ) -> Result<()> {
618 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
619 if let Some(code) = &user.invite_code {
620 let pool = self.connection_pool.lock();
621 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
622 for connection_id in pool.user_connection_ids(inviter_id) {
623 self.peer.send(
624 connection_id,
625 proto::UpdateContacts {
626 contacts: vec![invitee_contact.clone()],
627 ..Default::default()
628 },
629 )?;
630 self.peer.send(
631 connection_id,
632 proto::UpdateInviteInfo {
633 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
634 count: user.invite_count as u32,
635 },
636 )?;
637 }
638 }
639 }
640 Ok(())
641 }
642
643 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
644 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
645 if let Some(invite_code) = &user.invite_code {
646 let pool = self.connection_pool.lock();
647 for connection_id in pool.user_connection_ids(user_id) {
648 self.peer.send(
649 connection_id,
650 proto::UpdateInviteInfo {
651 url: format!(
652 "{}{}",
653 self.app_state.config.invite_link_prefix, invite_code
654 ),
655 count: user.invite_count as u32,
656 },
657 )?;
658 }
659 }
660 }
661 Ok(())
662 }
663
664 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
665 ServerSnapshot {
666 connection_pool: ConnectionPoolGuard {
667 guard: self.connection_pool.lock(),
668 _not_send: PhantomData,
669 },
670 peer: &self.peer,
671 }
672 }
673}
674
675impl<'a> Deref for ConnectionPoolGuard<'a> {
676 type Target = ConnectionPool;
677
678 fn deref(&self) -> &Self::Target {
679 &*self.guard
680 }
681}
682
683impl<'a> DerefMut for ConnectionPoolGuard<'a> {
684 fn deref_mut(&mut self) -> &mut Self::Target {
685 &mut *self.guard
686 }
687}
688
689impl<'a> Drop for ConnectionPoolGuard<'a> {
690 fn drop(&mut self) {
691 #[cfg(test)]
692 self.check_invariants();
693 }
694}
695
696fn broadcast<F>(
697 sender_id: Option<ConnectionId>,
698 receiver_ids: impl IntoIterator<Item = ConnectionId>,
699 mut f: F,
700) where
701 F: FnMut(ConnectionId) -> anyhow::Result<()>,
702{
703 for receiver_id in receiver_ids {
704 if Some(receiver_id) != sender_id {
705 if let Err(error) = f(receiver_id) {
706 tracing::error!("failed to send to {:?} {}", receiver_id, error);
707 }
708 }
709 }
710}
711
712lazy_static! {
713 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
714}
715
716pub struct ProtocolVersion(u32);
717
718impl Header for ProtocolVersion {
719 fn name() -> &'static HeaderName {
720 &ZED_PROTOCOL_VERSION
721 }
722
723 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
724 where
725 Self: Sized,
726 I: Iterator<Item = &'i axum::http::HeaderValue>,
727 {
728 let version = values
729 .next()
730 .ok_or_else(axum::headers::Error::invalid)?
731 .to_str()
732 .map_err(|_| axum::headers::Error::invalid())?
733 .parse()
734 .map_err(|_| axum::headers::Error::invalid())?;
735 Ok(Self(version))
736 }
737
738 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
739 values.extend([self.0.to_string().parse().unwrap()]);
740 }
741}
742
743pub fn routes(server: Arc<Server>) -> Router<Body> {
744 Router::new()
745 .route("/rpc", get(handle_websocket_request))
746 .layer(
747 ServiceBuilder::new()
748 .layer(Extension(server.app_state.clone()))
749 .layer(middleware::from_fn(auth::validate_header)),
750 )
751 .route("/metrics", get(handle_metrics))
752 .layer(Extension(server))
753}
754
755pub async fn handle_websocket_request(
756 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
757 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
758 Extension(server): Extension<Arc<Server>>,
759 Extension(user): Extension<User>,
760 ws: WebSocketUpgrade,
761) -> axum::response::Response {
762 if protocol_version != rpc::PROTOCOL_VERSION {
763 return (
764 StatusCode::UPGRADE_REQUIRED,
765 "client must be upgraded".to_string(),
766 )
767 .into_response();
768 }
769 let socket_address = socket_address.to_string();
770 ws.on_upgrade(move |socket| {
771 use util::ResultExt;
772 let socket = socket
773 .map_ok(to_tungstenite_message)
774 .err_into()
775 .with(|message| async move { Ok(to_axum_message(message)) });
776 let connection = Connection::new(Box::pin(socket));
777 async move {
778 server
779 .handle_connection(connection, socket_address, user, None, Executor::Production)
780 .await
781 .log_err();
782 }
783 })
784}
785
786pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
787 let connections = server
788 .connection_pool
789 .lock()
790 .connections()
791 .filter(|connection| !connection.admin)
792 .count();
793
794 METRIC_CONNECTIONS.set(connections as _);
795
796 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
797 METRIC_SHARED_PROJECTS.set(shared_projects as _);
798
799 let encoder = prometheus::TextEncoder::new();
800 let metric_families = prometheus::gather();
801 let encoded_metrics = encoder
802 .encode_to_string(&metric_families)
803 .map_err(|err| anyhow!("{}", err))?;
804 Ok(encoded_metrics)
805}
806
807#[instrument(err, skip(executor))]
808async fn connection_lost(
809 session: Session,
810 mut teardown: watch::Receiver<()>,
811 executor: Executor,
812) -> Result<()> {
813 session.peer.disconnect(session.connection_id);
814 session
815 .connection_pool()
816 .await
817 .remove_connection(session.connection_id)?;
818
819 session
820 .db()
821 .await
822 .connection_lost(session.connection_id)
823 .await
824 .trace_err();
825
826 futures::select_biased! {
827 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
828 leave_room_for_session(&session).await.trace_err();
829
830 if !session
831 .connection_pool()
832 .await
833 .is_user_online(session.user_id)
834 {
835 let db = session.db().await;
836 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
837 room_updated(&room, &session.peer);
838 }
839 }
840 update_user_contacts(session.user_id, &session).await?;
841 }
842 _ = teardown.changed().fuse() => {}
843 }
844
845 Ok(())
846}
847
848async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
849 response.send(proto::Ack {})?;
850 Ok(())
851}
852
853async fn create_room(
854 _request: proto::CreateRoom,
855 response: Response<proto::CreateRoom>,
856 session: Session,
857) -> Result<()> {
858 let live_kit_room = nanoid::nanoid!(30);
859 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
860 if let Some(_) = live_kit
861 .create_room(live_kit_room.clone())
862 .await
863 .trace_err()
864 {
865 if let Some(token) = live_kit
866 .room_token(&live_kit_room, &session.user_id.to_string())
867 .trace_err()
868 {
869 Some(proto::LiveKitConnectionInfo {
870 server_url: live_kit.url().into(),
871 token,
872 })
873 } else {
874 None
875 }
876 } else {
877 None
878 }
879 } else {
880 None
881 };
882
883 {
884 let room = session
885 .db()
886 .await
887 .create_room(session.user_id, session.connection_id, &live_kit_room)
888 .await?;
889
890 response.send(proto::CreateRoomResponse {
891 room: Some(room.clone()),
892 live_kit_connection_info,
893 })?;
894 }
895
896 update_user_contacts(session.user_id, &session).await?;
897 Ok(())
898}
899
900async fn join_room(
901 request: proto::JoinRoom,
902 response: Response<proto::JoinRoom>,
903 session: Session,
904) -> Result<()> {
905 let room_id = RoomId::from_proto(request.id);
906 let room = {
907 let room = session
908 .db()
909 .await
910 .join_room(room_id, session.user_id, session.connection_id)
911 .await?;
912 room_updated(&room, &session.peer);
913 room.clone()
914 };
915
916 for connection_id in session
917 .connection_pool()
918 .await
919 .user_connection_ids(session.user_id)
920 {
921 session
922 .peer
923 .send(
924 connection_id,
925 proto::CallCanceled {
926 room_id: room_id.to_proto(),
927 },
928 )
929 .trace_err();
930 }
931
932 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
933 if let Some(token) = live_kit
934 .room_token(&room.live_kit_room, &session.user_id.to_string())
935 .trace_err()
936 {
937 Some(proto::LiveKitConnectionInfo {
938 server_url: live_kit.url().into(),
939 token,
940 })
941 } else {
942 None
943 }
944 } else {
945 None
946 };
947
948 response.send(proto::JoinRoomResponse {
949 room: Some(room),
950 live_kit_connection_info,
951 })?;
952
953 update_user_contacts(session.user_id, &session).await?;
954 Ok(())
955}
956
957async fn rejoin_room(
958 request: proto::RejoinRoom,
959 response: Response<proto::RejoinRoom>,
960 session: Session,
961) -> Result<()> {
962 {
963 let mut rejoined_room = session
964 .db()
965 .await
966 .rejoin_room(request, session.user_id, session.connection_id)
967 .await?;
968
969 response.send(proto::RejoinRoomResponse {
970 room: Some(rejoined_room.room.clone()),
971 reshared_projects: rejoined_room
972 .reshared_projects
973 .iter()
974 .map(|project| proto::ResharedProject {
975 id: project.id.to_proto(),
976 collaborators: project
977 .collaborators
978 .iter()
979 .map(|collaborator| collaborator.to_proto())
980 .collect(),
981 })
982 .collect(),
983 rejoined_projects: rejoined_room
984 .rejoined_projects
985 .iter()
986 .map(|rejoined_project| proto::RejoinedProject {
987 id: rejoined_project.id.to_proto(),
988 worktrees: rejoined_project
989 .worktrees
990 .iter()
991 .map(|worktree| proto::WorktreeMetadata {
992 id: worktree.id,
993 root_name: worktree.root_name.clone(),
994 visible: worktree.visible,
995 abs_path: worktree.abs_path.clone(),
996 })
997 .collect(),
998 collaborators: rejoined_project
999 .collaborators
1000 .iter()
1001 .map(|collaborator| collaborator.to_proto())
1002 .collect(),
1003 language_servers: rejoined_project.language_servers.clone(),
1004 })
1005 .collect(),
1006 })?;
1007 room_updated(&rejoined_room.room, &session.peer);
1008
1009 for project in &rejoined_room.reshared_projects {
1010 for collaborator in &project.collaborators {
1011 session
1012 .peer
1013 .send(
1014 collaborator.connection_id,
1015 proto::UpdateProjectCollaborator {
1016 project_id: project.id.to_proto(),
1017 old_peer_id: Some(project.old_connection_id.into()),
1018 new_peer_id: Some(session.connection_id.into()),
1019 },
1020 )
1021 .trace_err();
1022 }
1023
1024 broadcast(
1025 Some(session.connection_id),
1026 project
1027 .collaborators
1028 .iter()
1029 .map(|collaborator| collaborator.connection_id),
1030 |connection_id| {
1031 session.peer.forward_send(
1032 session.connection_id,
1033 connection_id,
1034 proto::UpdateProject {
1035 project_id: project.id.to_proto(),
1036 worktrees: project.worktrees.clone(),
1037 },
1038 )
1039 },
1040 );
1041 }
1042
1043 for project in &rejoined_room.rejoined_projects {
1044 for collaborator in &project.collaborators {
1045 session
1046 .peer
1047 .send(
1048 collaborator.connection_id,
1049 proto::UpdateProjectCollaborator {
1050 project_id: project.id.to_proto(),
1051 old_peer_id: Some(project.old_connection_id.into()),
1052 new_peer_id: Some(session.connection_id.into()),
1053 },
1054 )
1055 .trace_err();
1056 }
1057 }
1058
1059 for project in &mut rejoined_room.rejoined_projects {
1060 for worktree in mem::take(&mut project.worktrees) {
1061 #[cfg(any(test, feature = "test-support"))]
1062 const MAX_CHUNK_SIZE: usize = 2;
1063 #[cfg(not(any(test, feature = "test-support")))]
1064 const MAX_CHUNK_SIZE: usize = 256;
1065
1066 // Stream this worktree's entries.
1067 let message = proto::UpdateWorktree {
1068 project_id: project.id.to_proto(),
1069 worktree_id: worktree.id,
1070 abs_path: worktree.abs_path.clone(),
1071 root_name: worktree.root_name,
1072 updated_entries: worktree.updated_entries,
1073 removed_entries: worktree.removed_entries,
1074 scan_id: worktree.scan_id,
1075 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1076 updated_repositories: worktree.updated_repositories,
1077 removed_repositories: worktree.removed_repositories,
1078 };
1079 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1080 session.peer.send(session.connection_id, update.clone())?;
1081 }
1082
1083 // Stream this worktree's diagnostics.
1084 for summary in worktree.diagnostic_summaries {
1085 session.peer.send(
1086 session.connection_id,
1087 proto::UpdateDiagnosticSummary {
1088 project_id: project.id.to_proto(),
1089 worktree_id: worktree.id,
1090 summary: Some(summary),
1091 },
1092 )?;
1093 }
1094
1095 for settings_file in worktree.settings_files {
1096 session.peer.send(
1097 session.connection_id,
1098 proto::UpdateWorktreeSettings {
1099 project_id: project.id.to_proto(),
1100 worktree_id: worktree.id,
1101 path: settings_file.path,
1102 content: Some(settings_file.content),
1103 },
1104 )?;
1105 }
1106 }
1107
1108 for language_server in &project.language_servers {
1109 session.peer.send(
1110 session.connection_id,
1111 proto::UpdateLanguageServer {
1112 project_id: project.id.to_proto(),
1113 language_server_id: language_server.id,
1114 variant: Some(
1115 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1116 proto::LspDiskBasedDiagnosticsUpdated {},
1117 ),
1118 ),
1119 },
1120 )?;
1121 }
1122 }
1123 }
1124
1125 update_user_contacts(session.user_id, &session).await?;
1126 Ok(())
1127}
1128
1129async fn leave_room(
1130 _: proto::LeaveRoom,
1131 response: Response<proto::LeaveRoom>,
1132 session: Session,
1133) -> Result<()> {
1134 leave_room_for_session(&session).await?;
1135 response.send(proto::Ack {})?;
1136 Ok(())
1137}
1138
1139async fn call(
1140 request: proto::Call,
1141 response: Response<proto::Call>,
1142 session: Session,
1143) -> Result<()> {
1144 let room_id = RoomId::from_proto(request.room_id);
1145 let calling_user_id = session.user_id;
1146 let calling_connection_id = session.connection_id;
1147 let called_user_id = UserId::from_proto(request.called_user_id);
1148 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1149 if !session
1150 .db()
1151 .await
1152 .has_contact(calling_user_id, called_user_id)
1153 .await?
1154 {
1155 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1156 }
1157
1158 let incoming_call = {
1159 let (room, incoming_call) = &mut *session
1160 .db()
1161 .await
1162 .call(
1163 room_id,
1164 calling_user_id,
1165 calling_connection_id,
1166 called_user_id,
1167 initial_project_id,
1168 )
1169 .await?;
1170 room_updated(&room, &session.peer);
1171 mem::take(incoming_call)
1172 };
1173 update_user_contacts(called_user_id, &session).await?;
1174
1175 let mut calls = session
1176 .connection_pool()
1177 .await
1178 .user_connection_ids(called_user_id)
1179 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1180 .collect::<FuturesUnordered<_>>();
1181
1182 while let Some(call_response) = calls.next().await {
1183 match call_response.as_ref() {
1184 Ok(_) => {
1185 response.send(proto::Ack {})?;
1186 return Ok(());
1187 }
1188 Err(_) => {
1189 call_response.trace_err();
1190 }
1191 }
1192 }
1193
1194 {
1195 let room = session
1196 .db()
1197 .await
1198 .call_failed(room_id, called_user_id)
1199 .await?;
1200 room_updated(&room, &session.peer);
1201 }
1202 update_user_contacts(called_user_id, &session).await?;
1203
1204 Err(anyhow!("failed to ring user"))?
1205}
1206
1207async fn cancel_call(
1208 request: proto::CancelCall,
1209 response: Response<proto::CancelCall>,
1210 session: Session,
1211) -> Result<()> {
1212 let called_user_id = UserId::from_proto(request.called_user_id);
1213 let room_id = RoomId::from_proto(request.room_id);
1214 {
1215 let room = session
1216 .db()
1217 .await
1218 .cancel_call(room_id, session.connection_id, called_user_id)
1219 .await?;
1220 room_updated(&room, &session.peer);
1221 }
1222
1223 for connection_id in session
1224 .connection_pool()
1225 .await
1226 .user_connection_ids(called_user_id)
1227 {
1228 session
1229 .peer
1230 .send(
1231 connection_id,
1232 proto::CallCanceled {
1233 room_id: room_id.to_proto(),
1234 },
1235 )
1236 .trace_err();
1237 }
1238 response.send(proto::Ack {})?;
1239
1240 update_user_contacts(called_user_id, &session).await?;
1241 Ok(())
1242}
1243
1244async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1245 let room_id = RoomId::from_proto(message.room_id);
1246 {
1247 let room = session
1248 .db()
1249 .await
1250 .decline_call(Some(room_id), session.user_id)
1251 .await?
1252 .ok_or_else(|| anyhow!("failed to decline call"))?;
1253 room_updated(&room, &session.peer);
1254 }
1255
1256 for connection_id in session
1257 .connection_pool()
1258 .await
1259 .user_connection_ids(session.user_id)
1260 {
1261 session
1262 .peer
1263 .send(
1264 connection_id,
1265 proto::CallCanceled {
1266 room_id: room_id.to_proto(),
1267 },
1268 )
1269 .trace_err();
1270 }
1271 update_user_contacts(session.user_id, &session).await?;
1272 Ok(())
1273}
1274
1275async fn update_participant_location(
1276 request: proto::UpdateParticipantLocation,
1277 response: Response<proto::UpdateParticipantLocation>,
1278 session: Session,
1279) -> Result<()> {
1280 let room_id = RoomId::from_proto(request.room_id);
1281 let location = request
1282 .location
1283 .ok_or_else(|| anyhow!("invalid location"))?;
1284 let room = session
1285 .db()
1286 .await
1287 .update_room_participant_location(room_id, session.connection_id, location)
1288 .await?;
1289 room_updated(&room, &session.peer);
1290 response.send(proto::Ack {})?;
1291 Ok(())
1292}
1293
1294async fn share_project(
1295 request: proto::ShareProject,
1296 response: Response<proto::ShareProject>,
1297 session: Session,
1298) -> Result<()> {
1299 let (project_id, room) = &*session
1300 .db()
1301 .await
1302 .share_project(
1303 RoomId::from_proto(request.room_id),
1304 session.connection_id,
1305 &request.worktrees,
1306 )
1307 .await?;
1308 response.send(proto::ShareProjectResponse {
1309 project_id: project_id.to_proto(),
1310 })?;
1311 room_updated(&room, &session.peer);
1312
1313 Ok(())
1314}
1315
1316async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1317 let project_id = ProjectId::from_proto(message.project_id);
1318
1319 let (room, guest_connection_ids) = &*session
1320 .db()
1321 .await
1322 .unshare_project(project_id, session.connection_id)
1323 .await?;
1324
1325 broadcast(
1326 Some(session.connection_id),
1327 guest_connection_ids.iter().copied(),
1328 |conn_id| session.peer.send(conn_id, message.clone()),
1329 );
1330 room_updated(&room, &session.peer);
1331
1332 Ok(())
1333}
1334
1335async fn join_project(
1336 request: proto::JoinProject,
1337 response: Response<proto::JoinProject>,
1338 session: Session,
1339) -> Result<()> {
1340 let project_id = ProjectId::from_proto(request.project_id);
1341 let guest_user_id = session.user_id;
1342
1343 tracing::info!(%project_id, "join project");
1344
1345 let (project, replica_id) = &mut *session
1346 .db()
1347 .await
1348 .join_project(project_id, session.connection_id)
1349 .await?;
1350
1351 let collaborators = project
1352 .collaborators
1353 .iter()
1354 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1355 .map(|collaborator| collaborator.to_proto())
1356 .collect::<Vec<_>>();
1357
1358 let worktrees = project
1359 .worktrees
1360 .iter()
1361 .map(|(id, worktree)| proto::WorktreeMetadata {
1362 id: *id,
1363 root_name: worktree.root_name.clone(),
1364 visible: worktree.visible,
1365 abs_path: worktree.abs_path.clone(),
1366 })
1367 .collect::<Vec<_>>();
1368
1369 for collaborator in &collaborators {
1370 session
1371 .peer
1372 .send(
1373 collaborator.peer_id.unwrap().into(),
1374 proto::AddProjectCollaborator {
1375 project_id: project_id.to_proto(),
1376 collaborator: Some(proto::Collaborator {
1377 peer_id: Some(session.connection_id.into()),
1378 replica_id: replica_id.0 as u32,
1379 user_id: guest_user_id.to_proto(),
1380 }),
1381 },
1382 )
1383 .trace_err();
1384 }
1385
1386 // First, we send the metadata associated with each worktree.
1387 response.send(proto::JoinProjectResponse {
1388 worktrees: worktrees.clone(),
1389 replica_id: replica_id.0 as u32,
1390 collaborators: collaborators.clone(),
1391 language_servers: project.language_servers.clone(),
1392 })?;
1393
1394 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1395 #[cfg(any(test, feature = "test-support"))]
1396 const MAX_CHUNK_SIZE: usize = 2;
1397 #[cfg(not(any(test, feature = "test-support")))]
1398 const MAX_CHUNK_SIZE: usize = 256;
1399
1400 // Stream this worktree's entries.
1401 let message = proto::UpdateWorktree {
1402 project_id: project_id.to_proto(),
1403 worktree_id,
1404 abs_path: worktree.abs_path.clone(),
1405 root_name: worktree.root_name,
1406 updated_entries: worktree.entries,
1407 removed_entries: Default::default(),
1408 scan_id: worktree.scan_id,
1409 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1410 updated_repositories: worktree.repository_entries.into_values().collect(),
1411 removed_repositories: Default::default(),
1412 };
1413 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1414 session.peer.send(session.connection_id, update.clone())?;
1415 }
1416
1417 // Stream this worktree's diagnostics.
1418 for summary in worktree.diagnostic_summaries {
1419 session.peer.send(
1420 session.connection_id,
1421 proto::UpdateDiagnosticSummary {
1422 project_id: project_id.to_proto(),
1423 worktree_id: worktree.id,
1424 summary: Some(summary),
1425 },
1426 )?;
1427 }
1428
1429 for settings_file in worktree.settings_files {
1430 session.peer.send(
1431 session.connection_id,
1432 proto::UpdateWorktreeSettings {
1433 project_id: project_id.to_proto(),
1434 worktree_id: worktree.id,
1435 path: settings_file.path,
1436 content: Some(settings_file.content),
1437 },
1438 )?;
1439 }
1440 }
1441
1442 for language_server in &project.language_servers {
1443 session.peer.send(
1444 session.connection_id,
1445 proto::UpdateLanguageServer {
1446 project_id: project_id.to_proto(),
1447 language_server_id: language_server.id,
1448 variant: Some(
1449 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1450 proto::LspDiskBasedDiagnosticsUpdated {},
1451 ),
1452 ),
1453 },
1454 )?;
1455 }
1456
1457 Ok(())
1458}
1459
1460async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1461 let sender_id = session.connection_id;
1462 let project_id = ProjectId::from_proto(request.project_id);
1463
1464 let (room, project) = &*session
1465 .db()
1466 .await
1467 .leave_project(project_id, sender_id)
1468 .await?;
1469 tracing::info!(
1470 %project_id,
1471 host_user_id = %project.host_user_id,
1472 host_connection_id = %project.host_connection_id,
1473 "leave project"
1474 );
1475
1476 project_left(&project, &session);
1477 room_updated(&room, &session.peer);
1478
1479 Ok(())
1480}
1481
1482async fn update_project(
1483 request: proto::UpdateProject,
1484 response: Response<proto::UpdateProject>,
1485 session: Session,
1486) -> Result<()> {
1487 let project_id = ProjectId::from_proto(request.project_id);
1488 let (room, guest_connection_ids) = &*session
1489 .db()
1490 .await
1491 .update_project(project_id, session.connection_id, &request.worktrees)
1492 .await?;
1493 broadcast(
1494 Some(session.connection_id),
1495 guest_connection_ids.iter().copied(),
1496 |connection_id| {
1497 session
1498 .peer
1499 .forward_send(session.connection_id, connection_id, request.clone())
1500 },
1501 );
1502 room_updated(&room, &session.peer);
1503 response.send(proto::Ack {})?;
1504
1505 Ok(())
1506}
1507
1508async fn update_worktree(
1509 request: proto::UpdateWorktree,
1510 response: Response<proto::UpdateWorktree>,
1511 session: Session,
1512) -> Result<()> {
1513 let guest_connection_ids = session
1514 .db()
1515 .await
1516 .update_worktree(&request, session.connection_id)
1517 .await?;
1518
1519 broadcast(
1520 Some(session.connection_id),
1521 guest_connection_ids.iter().copied(),
1522 |connection_id| {
1523 session
1524 .peer
1525 .forward_send(session.connection_id, connection_id, request.clone())
1526 },
1527 );
1528 response.send(proto::Ack {})?;
1529 Ok(())
1530}
1531
1532async fn update_diagnostic_summary(
1533 message: proto::UpdateDiagnosticSummary,
1534 session: Session,
1535) -> Result<()> {
1536 let guest_connection_ids = session
1537 .db()
1538 .await
1539 .update_diagnostic_summary(&message, session.connection_id)
1540 .await?;
1541
1542 broadcast(
1543 Some(session.connection_id),
1544 guest_connection_ids.iter().copied(),
1545 |connection_id| {
1546 session
1547 .peer
1548 .forward_send(session.connection_id, connection_id, message.clone())
1549 },
1550 );
1551
1552 Ok(())
1553}
1554
1555async fn update_worktree_settings(
1556 message: proto::UpdateWorktreeSettings,
1557 session: Session,
1558) -> Result<()> {
1559 let guest_connection_ids = session
1560 .db()
1561 .await
1562 .update_worktree_settings(&message, session.connection_id)
1563 .await?;
1564
1565 broadcast(
1566 Some(session.connection_id),
1567 guest_connection_ids.iter().copied(),
1568 |connection_id| {
1569 session
1570 .peer
1571 .forward_send(session.connection_id, connection_id, message.clone())
1572 },
1573 );
1574
1575 Ok(())
1576}
1577
1578async fn start_language_server(
1579 request: proto::StartLanguageServer,
1580 session: Session,
1581) -> Result<()> {
1582 let guest_connection_ids = session
1583 .db()
1584 .await
1585 .start_language_server(&request, session.connection_id)
1586 .await?;
1587
1588 broadcast(
1589 Some(session.connection_id),
1590 guest_connection_ids.iter().copied(),
1591 |connection_id| {
1592 session
1593 .peer
1594 .forward_send(session.connection_id, connection_id, request.clone())
1595 },
1596 );
1597 Ok(())
1598}
1599
1600async fn update_language_server(
1601 request: proto::UpdateLanguageServer,
1602 session: Session,
1603) -> Result<()> {
1604 session.executor.record_backtrace();
1605 let project_id = ProjectId::from_proto(request.project_id);
1606 let project_connection_ids = session
1607 .db()
1608 .await
1609 .project_connection_ids(project_id, session.connection_id)
1610 .await?;
1611 broadcast(
1612 Some(session.connection_id),
1613 project_connection_ids.iter().copied(),
1614 |connection_id| {
1615 session
1616 .peer
1617 .forward_send(session.connection_id, connection_id, request.clone())
1618 },
1619 );
1620 Ok(())
1621}
1622
1623async fn forward_project_request<T>(
1624 request: T,
1625 response: Response<T>,
1626 session: Session,
1627) -> Result<()>
1628where
1629 T: EntityMessage + RequestMessage,
1630{
1631 session.executor.record_backtrace();
1632 let project_id = ProjectId::from_proto(request.remote_entity_id());
1633 let host_connection_id = {
1634 let collaborators = session
1635 .db()
1636 .await
1637 .project_collaborators(project_id, session.connection_id)
1638 .await?;
1639 collaborators
1640 .iter()
1641 .find(|collaborator| collaborator.is_host)
1642 .ok_or_else(|| anyhow!("host not found"))?
1643 .connection_id
1644 };
1645
1646 let payload = session
1647 .peer
1648 .forward_request(session.connection_id, host_connection_id, request)
1649 .await?;
1650
1651 response.send(payload)?;
1652 Ok(())
1653}
1654
1655async fn create_buffer_for_peer(
1656 request: proto::CreateBufferForPeer,
1657 session: Session,
1658) -> Result<()> {
1659 session.executor.record_backtrace();
1660 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1661 session
1662 .peer
1663 .forward_send(session.connection_id, peer_id.into(), request)?;
1664 Ok(())
1665}
1666
1667async fn update_buffer(
1668 request: proto::UpdateBuffer,
1669 response: Response<proto::UpdateBuffer>,
1670 session: Session,
1671) -> Result<()> {
1672 session.executor.record_backtrace();
1673 let project_id = ProjectId::from_proto(request.project_id);
1674 let mut guest_connection_ids;
1675 let mut host_connection_id = None;
1676 {
1677 let collaborators = session
1678 .db()
1679 .await
1680 .project_collaborators(project_id, session.connection_id)
1681 .await?;
1682 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1683 for collaborator in collaborators.iter() {
1684 if collaborator.is_host {
1685 host_connection_id = Some(collaborator.connection_id);
1686 } else {
1687 guest_connection_ids.push(collaborator.connection_id);
1688 }
1689 }
1690 }
1691 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1692
1693 session.executor.record_backtrace();
1694 broadcast(
1695 Some(session.connection_id),
1696 guest_connection_ids,
1697 |connection_id| {
1698 session
1699 .peer
1700 .forward_send(session.connection_id, connection_id, request.clone())
1701 },
1702 );
1703 if host_connection_id != session.connection_id {
1704 session
1705 .peer
1706 .forward_request(session.connection_id, host_connection_id, request.clone())
1707 .await?;
1708 }
1709
1710 response.send(proto::Ack {})?;
1711 Ok(())
1712}
1713
1714async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1715 let project_id = ProjectId::from_proto(request.project_id);
1716 let project_connection_ids = session
1717 .db()
1718 .await
1719 .project_connection_ids(project_id, session.connection_id)
1720 .await?;
1721
1722 broadcast(
1723 Some(session.connection_id),
1724 project_connection_ids.iter().copied(),
1725 |connection_id| {
1726 session
1727 .peer
1728 .forward_send(session.connection_id, connection_id, request.clone())
1729 },
1730 );
1731 Ok(())
1732}
1733
1734async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1735 let project_id = ProjectId::from_proto(request.project_id);
1736 let project_connection_ids = session
1737 .db()
1738 .await
1739 .project_connection_ids(project_id, session.connection_id)
1740 .await?;
1741 broadcast(
1742 Some(session.connection_id),
1743 project_connection_ids.iter().copied(),
1744 |connection_id| {
1745 session
1746 .peer
1747 .forward_send(session.connection_id, connection_id, request.clone())
1748 },
1749 );
1750 Ok(())
1751}
1752
1753async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1754 let project_id = ProjectId::from_proto(request.project_id);
1755 let project_connection_ids = session
1756 .db()
1757 .await
1758 .project_connection_ids(project_id, session.connection_id)
1759 .await?;
1760 broadcast(
1761 Some(session.connection_id),
1762 project_connection_ids.iter().copied(),
1763 |connection_id| {
1764 session
1765 .peer
1766 .forward_send(session.connection_id, connection_id, request.clone())
1767 },
1768 );
1769 Ok(())
1770}
1771
1772async fn follow(
1773 request: proto::Follow,
1774 response: Response<proto::Follow>,
1775 session: Session,
1776) -> Result<()> {
1777 let project_id = ProjectId::from_proto(request.project_id);
1778 let leader_id = request
1779 .leader_id
1780 .ok_or_else(|| anyhow!("invalid leader id"))?
1781 .into();
1782 let follower_id = session.connection_id;
1783
1784 {
1785 let project_connection_ids = session
1786 .db()
1787 .await
1788 .project_connection_ids(project_id, session.connection_id)
1789 .await?;
1790
1791 if !project_connection_ids.contains(&leader_id) {
1792 Err(anyhow!("no such peer"))?;
1793 }
1794 }
1795
1796 let mut response_payload = session
1797 .peer
1798 .forward_request(session.connection_id, leader_id, request)
1799 .await?;
1800 response_payload
1801 .views
1802 .retain(|view| view.leader_id != Some(follower_id.into()));
1803 response.send(response_payload)?;
1804
1805 let room = session
1806 .db()
1807 .await
1808 .follow(project_id, leader_id, follower_id)
1809 .await?;
1810 room_updated(&room, &session.peer);
1811
1812 Ok(())
1813}
1814
1815async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1816 let project_id = ProjectId::from_proto(request.project_id);
1817 let leader_id = request
1818 .leader_id
1819 .ok_or_else(|| anyhow!("invalid leader id"))?
1820 .into();
1821 let follower_id = session.connection_id;
1822
1823 if !session
1824 .db()
1825 .await
1826 .project_connection_ids(project_id, session.connection_id)
1827 .await?
1828 .contains(&leader_id)
1829 {
1830 Err(anyhow!("no such peer"))?;
1831 }
1832
1833 session
1834 .peer
1835 .forward_send(session.connection_id, leader_id, request)?;
1836
1837 let room = session
1838 .db()
1839 .await
1840 .unfollow(project_id, leader_id, follower_id)
1841 .await?;
1842 room_updated(&room, &session.peer);
1843
1844 Ok(())
1845}
1846
1847async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1848 let project_id = ProjectId::from_proto(request.project_id);
1849 let project_connection_ids = session
1850 .db
1851 .lock()
1852 .await
1853 .project_connection_ids(project_id, session.connection_id)
1854 .await?;
1855
1856 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1857 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1858 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1859 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1860 });
1861 for follower_peer_id in request.follower_ids.iter().copied() {
1862 let follower_connection_id = follower_peer_id.into();
1863 if project_connection_ids.contains(&follower_connection_id)
1864 && Some(follower_peer_id) != leader_id
1865 {
1866 session.peer.forward_send(
1867 session.connection_id,
1868 follower_connection_id,
1869 request.clone(),
1870 )?;
1871 }
1872 }
1873 Ok(())
1874}
1875
1876async fn get_users(
1877 request: proto::GetUsers,
1878 response: Response<proto::GetUsers>,
1879 session: Session,
1880) -> Result<()> {
1881 let user_ids = request
1882 .user_ids
1883 .into_iter()
1884 .map(UserId::from_proto)
1885 .collect();
1886 let users = session
1887 .db()
1888 .await
1889 .get_users_by_ids(user_ids)
1890 .await?
1891 .into_iter()
1892 .map(|user| proto::User {
1893 id: user.id.to_proto(),
1894 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1895 github_login: user.github_login,
1896 })
1897 .collect();
1898 response.send(proto::UsersResponse { users })?;
1899 Ok(())
1900}
1901
1902async fn fuzzy_search_users(
1903 request: proto::FuzzySearchUsers,
1904 response: Response<proto::FuzzySearchUsers>,
1905 session: Session,
1906) -> Result<()> {
1907 let query = request.query;
1908 let users = match query.len() {
1909 0 => vec![],
1910 1 | 2 => session
1911 .db()
1912 .await
1913 .get_user_by_github_login(&query)
1914 .await?
1915 .into_iter()
1916 .collect(),
1917 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1918 };
1919 let users = users
1920 .into_iter()
1921 .filter(|user| user.id != session.user_id)
1922 .map(|user| proto::User {
1923 id: user.id.to_proto(),
1924 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1925 github_login: user.github_login,
1926 })
1927 .collect();
1928 response.send(proto::UsersResponse { users })?;
1929 Ok(())
1930}
1931
1932async fn request_contact(
1933 request: proto::RequestContact,
1934 response: Response<proto::RequestContact>,
1935 session: Session,
1936) -> Result<()> {
1937 let requester_id = session.user_id;
1938 let responder_id = UserId::from_proto(request.responder_id);
1939 if requester_id == responder_id {
1940 return Err(anyhow!("cannot add yourself as a contact"))?;
1941 }
1942
1943 session
1944 .db()
1945 .await
1946 .send_contact_request(requester_id, responder_id)
1947 .await?;
1948
1949 // Update outgoing contact requests of requester
1950 let mut update = proto::UpdateContacts::default();
1951 update.outgoing_requests.push(responder_id.to_proto());
1952 for connection_id in session
1953 .connection_pool()
1954 .await
1955 .user_connection_ids(requester_id)
1956 {
1957 session.peer.send(connection_id, update.clone())?;
1958 }
1959
1960 // Update incoming contact requests of responder
1961 let mut update = proto::UpdateContacts::default();
1962 update
1963 .incoming_requests
1964 .push(proto::IncomingContactRequest {
1965 requester_id: requester_id.to_proto(),
1966 should_notify: true,
1967 });
1968 for connection_id in session
1969 .connection_pool()
1970 .await
1971 .user_connection_ids(responder_id)
1972 {
1973 session.peer.send(connection_id, update.clone())?;
1974 }
1975
1976 response.send(proto::Ack {})?;
1977 Ok(())
1978}
1979
1980async fn respond_to_contact_request(
1981 request: proto::RespondToContactRequest,
1982 response: Response<proto::RespondToContactRequest>,
1983 session: Session,
1984) -> Result<()> {
1985 let responder_id = session.user_id;
1986 let requester_id = UserId::from_proto(request.requester_id);
1987 let db = session.db().await;
1988 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1989 db.dismiss_contact_notification(responder_id, requester_id)
1990 .await?;
1991 } else {
1992 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1993
1994 db.respond_to_contact_request(responder_id, requester_id, accept)
1995 .await?;
1996 let requester_busy = db.is_user_busy(requester_id).await?;
1997 let responder_busy = db.is_user_busy(responder_id).await?;
1998
1999 let pool = session.connection_pool().await;
2000 // Update responder with new contact
2001 let mut update = proto::UpdateContacts::default();
2002 if accept {
2003 update
2004 .contacts
2005 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2006 }
2007 update
2008 .remove_incoming_requests
2009 .push(requester_id.to_proto());
2010 for connection_id in pool.user_connection_ids(responder_id) {
2011 session.peer.send(connection_id, update.clone())?;
2012 }
2013
2014 // Update requester with new contact
2015 let mut update = proto::UpdateContacts::default();
2016 if accept {
2017 update
2018 .contacts
2019 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2020 }
2021 update
2022 .remove_outgoing_requests
2023 .push(responder_id.to_proto());
2024 for connection_id in pool.user_connection_ids(requester_id) {
2025 session.peer.send(connection_id, update.clone())?;
2026 }
2027 }
2028
2029 response.send(proto::Ack {})?;
2030 Ok(())
2031}
2032
2033async fn remove_contact(
2034 request: proto::RemoveContact,
2035 response: Response<proto::RemoveContact>,
2036 session: Session,
2037) -> Result<()> {
2038 let requester_id = session.user_id;
2039 let responder_id = UserId::from_proto(request.user_id);
2040 let db = session.db().await;
2041 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2042
2043 let pool = session.connection_pool().await;
2044 // Update outgoing contact requests of requester
2045 let mut update = proto::UpdateContacts::default();
2046 if contact_accepted {
2047 update.remove_contacts.push(responder_id.to_proto());
2048 } else {
2049 update
2050 .remove_outgoing_requests
2051 .push(responder_id.to_proto());
2052 }
2053 for connection_id in pool.user_connection_ids(requester_id) {
2054 session.peer.send(connection_id, update.clone())?;
2055 }
2056
2057 // Update incoming contact requests of responder
2058 let mut update = proto::UpdateContacts::default();
2059 if contact_accepted {
2060 update.remove_contacts.push(requester_id.to_proto());
2061 } else {
2062 update
2063 .remove_incoming_requests
2064 .push(requester_id.to_proto());
2065 }
2066 for connection_id in pool.user_connection_ids(responder_id) {
2067 session.peer.send(connection_id, update.clone())?;
2068 }
2069
2070 response.send(proto::Ack {})?;
2071 Ok(())
2072}
2073
2074async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2075 let project_id = ProjectId::from_proto(request.project_id);
2076 let project_connection_ids = session
2077 .db()
2078 .await
2079 .project_connection_ids(project_id, session.connection_id)
2080 .await?;
2081 broadcast(
2082 Some(session.connection_id),
2083 project_connection_ids.iter().copied(),
2084 |connection_id| {
2085 session
2086 .peer
2087 .forward_send(session.connection_id, connection_id, request.clone())
2088 },
2089 );
2090 Ok(())
2091}
2092
2093async fn get_private_user_info(
2094 _request: proto::GetPrivateUserInfo,
2095 response: Response<proto::GetPrivateUserInfo>,
2096 session: Session,
2097) -> Result<()> {
2098 let metrics_id = session
2099 .db()
2100 .await
2101 .get_user_metrics_id(session.user_id)
2102 .await?;
2103 let user = session
2104 .db()
2105 .await
2106 .get_user_by_id(session.user_id)
2107 .await?
2108 .ok_or_else(|| anyhow!("user not found"))?;
2109 response.send(proto::GetPrivateUserInfoResponse {
2110 metrics_id,
2111 staff: user.admin,
2112 })?;
2113 Ok(())
2114}
2115
2116fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2117 match message {
2118 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2119 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2120 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2121 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2122 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2123 code: frame.code.into(),
2124 reason: frame.reason,
2125 })),
2126 }
2127}
2128
2129fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2130 match message {
2131 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2132 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2133 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2134 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2135 AxumMessage::Close(frame) => {
2136 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2137 code: frame.code.into(),
2138 reason: frame.reason,
2139 }))
2140 }
2141 }
2142}
2143
2144fn build_initial_contacts_update(
2145 contacts: Vec<db::Contact>,
2146 pool: &ConnectionPool,
2147) -> proto::UpdateContacts {
2148 let mut update = proto::UpdateContacts::default();
2149
2150 for contact in contacts {
2151 match contact {
2152 db::Contact::Accepted {
2153 user_id,
2154 should_notify,
2155 busy,
2156 } => {
2157 update
2158 .contacts
2159 .push(contact_for_user(user_id, should_notify, busy, &pool));
2160 }
2161 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2162 db::Contact::Incoming {
2163 user_id,
2164 should_notify,
2165 } => update
2166 .incoming_requests
2167 .push(proto::IncomingContactRequest {
2168 requester_id: user_id.to_proto(),
2169 should_notify,
2170 }),
2171 }
2172 }
2173
2174 update
2175}
2176
2177fn contact_for_user(
2178 user_id: UserId,
2179 should_notify: bool,
2180 busy: bool,
2181 pool: &ConnectionPool,
2182) -> proto::Contact {
2183 proto::Contact {
2184 user_id: user_id.to_proto(),
2185 online: pool.is_user_online(user_id),
2186 busy,
2187 should_notify,
2188 }
2189}
2190
2191fn room_updated(room: &proto::Room, peer: &Peer) {
2192 broadcast(
2193 None,
2194 room.participants
2195 .iter()
2196 .filter_map(|participant| Some(participant.peer_id?.into())),
2197 |peer_id| {
2198 peer.send(
2199 peer_id.into(),
2200 proto::RoomUpdated {
2201 room: Some(room.clone()),
2202 },
2203 )
2204 },
2205 );
2206}
2207
2208async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2209 let db = session.db().await;
2210 let contacts = db.get_contacts(user_id).await?;
2211 let busy = db.is_user_busy(user_id).await?;
2212
2213 let pool = session.connection_pool().await;
2214 let updated_contact = contact_for_user(user_id, false, busy, &pool);
2215 for contact in contacts {
2216 if let db::Contact::Accepted {
2217 user_id: contact_user_id,
2218 ..
2219 } = contact
2220 {
2221 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2222 session
2223 .peer
2224 .send(
2225 contact_conn_id,
2226 proto::UpdateContacts {
2227 contacts: vec![updated_contact.clone()],
2228 remove_contacts: Default::default(),
2229 incoming_requests: Default::default(),
2230 remove_incoming_requests: Default::default(),
2231 outgoing_requests: Default::default(),
2232 remove_outgoing_requests: Default::default(),
2233 },
2234 )
2235 .trace_err();
2236 }
2237 }
2238 }
2239 Ok(())
2240}
2241
2242async fn leave_room_for_session(session: &Session) -> Result<()> {
2243 let mut contacts_to_update = HashSet::default();
2244
2245 let room_id;
2246 let canceled_calls_to_user_ids;
2247 let live_kit_room;
2248 let delete_live_kit_room;
2249 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2250 contacts_to_update.insert(session.user_id);
2251
2252 for project in left_room.left_projects.values() {
2253 project_left(project, session);
2254 }
2255
2256 room_updated(&left_room.room, &session.peer);
2257 room_id = RoomId::from_proto(left_room.room.id);
2258 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2259 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2260 delete_live_kit_room = left_room.room.participants.is_empty();
2261 } else {
2262 return Ok(());
2263 }
2264
2265 {
2266 let pool = session.connection_pool().await;
2267 for canceled_user_id in canceled_calls_to_user_ids {
2268 for connection_id in pool.user_connection_ids(canceled_user_id) {
2269 session
2270 .peer
2271 .send(
2272 connection_id,
2273 proto::CallCanceled {
2274 room_id: room_id.to_proto(),
2275 },
2276 )
2277 .trace_err();
2278 }
2279 contacts_to_update.insert(canceled_user_id);
2280 }
2281 }
2282
2283 for contact_user_id in contacts_to_update {
2284 update_user_contacts(contact_user_id, &session).await?;
2285 }
2286
2287 if let Some(live_kit) = session.live_kit_client.as_ref() {
2288 live_kit
2289 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2290 .await
2291 .trace_err();
2292
2293 if delete_live_kit_room {
2294 live_kit.delete_room(live_kit_room).await.trace_err();
2295 }
2296 }
2297
2298 Ok(())
2299}
2300
2301fn project_left(project: &db::LeftProject, session: &Session) {
2302 for connection_id in &project.connection_ids {
2303 if project.host_user_id == session.user_id {
2304 session
2305 .peer
2306 .send(
2307 *connection_id,
2308 proto::UnshareProject {
2309 project_id: project.id.to_proto(),
2310 },
2311 )
2312 .trace_err();
2313 } else {
2314 session
2315 .peer
2316 .send(
2317 *connection_id,
2318 proto::RemoveProjectCollaborator {
2319 project_id: project.id.to_proto(),
2320 peer_id: Some(session.connection_id.into()),
2321 },
2322 )
2323 .trace_err();
2324 }
2325 }
2326}
2327
2328pub trait ResultExt {
2329 type Ok;
2330
2331 fn trace_err(self) -> Option<Self::Ok>;
2332}
2333
2334impl<T, E> ResultExt for Result<T, E>
2335where
2336 E: std::fmt::Debug,
2337{
2338 type Ok = T;
2339
2340 fn trace_err(self) -> Option<T> {
2341 match self {
2342 Ok(value) => Some(value),
2343 Err(error) => {
2344 tracing::error!("{:?}", error);
2345 None
2346 }
2347 }
2348 }
2349}