1#[cfg(any(test, feature = "test-support"))]
2pub mod test;
3
4pub mod telemetry;
5pub mod user;
6
7use anyhow::{anyhow, Context as _, Result};
8use async_recursion::async_recursion;
9use async_tungstenite::tungstenite::{
10 error::Error as WebsocketError,
11 http::{Request, StatusCode},
12};
13use futures::{
14 future::BoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, TryStreamExt,
15};
16use gpui2::{
17 serde_json, AnyHandle, AnyWeakHandle, AppContext, AsyncAppContext, Handle, SemanticVersion,
18 Task, WeakHandle,
19};
20use lazy_static::lazy_static;
21use parking_lot::RwLock;
22use postage::watch;
23use rand::prelude::*;
24use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
25use schemars::JsonSchema;
26use serde::{Deserialize, Serialize};
27use std::{
28 any::TypeId,
29 collections::HashMap,
30 convert::TryFrom,
31 fmt::Write as _,
32 future::Future,
33 marker::PhantomData,
34 path::PathBuf,
35 sync::{atomic::AtomicU64, Arc, Weak},
36 time::{Duration, Instant},
37};
38use telemetry::Telemetry;
39use thiserror::Error;
40use url::Url;
41use util::channel::ReleaseChannel;
42use util::http::HttpClient;
43use util::{ResultExt, TryFutureExt};
44
45pub use rpc::*;
46pub use telemetry::ClickhouseEvent;
47pub use user::*;
48
49lazy_static! {
50 pub static ref ZED_SERVER_URL: String =
51 std::env::var("ZED_SERVER_URL").unwrap_or_else(|_| "https://zed.dev".to_string());
52 pub static ref IMPERSONATE_LOGIN: Option<String> = std::env::var("ZED_IMPERSONATE")
53 .ok()
54 .and_then(|s| if s.is_empty() { None } else { Some(s) });
55 pub static ref ADMIN_API_TOKEN: Option<String> = std::env::var("ZED_ADMIN_API_TOKEN")
56 .ok()
57 .and_then(|s| if s.is_empty() { None } else { Some(s) });
58 pub static ref ZED_APP_VERSION: Option<SemanticVersion> = std::env::var("ZED_APP_VERSION")
59 .ok()
60 .and_then(|v| v.parse().ok());
61 pub static ref ZED_APP_PATH: Option<PathBuf> =
62 std::env::var("ZED_APP_PATH").ok().map(PathBuf::from);
63 pub static ref ZED_ALWAYS_ACTIVE: bool =
64 std::env::var("ZED_ALWAYS_ACTIVE").map_or(false, |e| e.len() > 0);
65}
66
67pub const ZED_SECRET_CLIENT_TOKEN: &str = "618033988749894";
68pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(100);
69pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
70
71#[derive(Clone, Default, PartialEq, Deserialize)]
72pub struct SignIn;
73
74#[derive(Clone, Default, PartialEq, Deserialize)]
75pub struct SignOut;
76
77#[derive(Clone, Default, PartialEq, Deserialize)]
78pub struct Reconnect;
79
80pub fn init_settings(cx: &mut AppContext) {
81 settings2::register::<TelemetrySettings>(cx);
82}
83
84pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
85 init_settings(cx);
86
87 let client = Arc::downgrade(client);
88 cx.register_action_type::<SignIn>();
89 cx.on_action({
90 let client = client.clone();
91 move |_: &SignIn, cx| {
92 if let Some(client) = client.upgrade() {
93 cx.spawn(
94 |cx| async move { client.authenticate_and_connect(true, &cx).log_err().await },
95 )
96 .detach();
97 }
98 }
99 });
100
101 cx.register_action_type::<SignOut>();
102 cx.on_action({
103 let client = client.clone();
104 move |_: &SignOut, cx| {
105 if let Some(client) = client.upgrade() {
106 cx.spawn(|cx| async move {
107 client.disconnect(&cx);
108 })
109 .detach();
110 }
111 }
112 });
113
114 cx.register_action_type::<Reconnect>();
115 cx.on_action({
116 let client = client.clone();
117 move |_: &Reconnect, cx| {
118 if let Some(client) = client.upgrade() {
119 cx.spawn(|cx| async move {
120 client.reconnect(&cx);
121 })
122 .detach();
123 }
124 }
125 });
126}
127
128pub struct Client {
129 id: AtomicU64,
130 peer: Arc<Peer>,
131 http: Arc<dyn HttpClient>,
132 telemetry: Arc<Telemetry>,
133 state: RwLock<ClientState>,
134
135 #[allow(clippy::type_complexity)]
136 #[cfg(any(test, feature = "test-support"))]
137 authenticate: RwLock<
138 Option<Box<dyn 'static + Send + Sync + Fn(&AsyncAppContext) -> Task<Result<Credentials>>>>,
139 >,
140
141 #[allow(clippy::type_complexity)]
142 #[cfg(any(test, feature = "test-support"))]
143 establish_connection: RwLock<
144 Option<
145 Box<
146 dyn 'static
147 + Send
148 + Sync
149 + Fn(
150 &Credentials,
151 &AsyncAppContext,
152 ) -> Task<Result<Connection, EstablishConnectionError>>,
153 >,
154 >,
155 >,
156}
157
158#[derive(Error, Debug)]
159pub enum EstablishConnectionError {
160 #[error("upgrade required")]
161 UpgradeRequired,
162 #[error("unauthorized")]
163 Unauthorized,
164 #[error("{0}")]
165 Other(#[from] anyhow::Error),
166 #[error("{0}")]
167 Http(#[from] util::http::Error),
168 #[error("{0}")]
169 Io(#[from] std::io::Error),
170 #[error("{0}")]
171 Websocket(#[from] async_tungstenite::tungstenite::http::Error),
172}
173
174impl From<WebsocketError> for EstablishConnectionError {
175 fn from(error: WebsocketError) -> Self {
176 if let WebsocketError::Http(response) = &error {
177 match response.status() {
178 StatusCode::UNAUTHORIZED => return EstablishConnectionError::Unauthorized,
179 StatusCode::UPGRADE_REQUIRED => return EstablishConnectionError::UpgradeRequired,
180 _ => {}
181 }
182 }
183 EstablishConnectionError::Other(error.into())
184 }
185}
186
187impl EstablishConnectionError {
188 pub fn other(error: impl Into<anyhow::Error> + Send + Sync) -> Self {
189 Self::Other(error.into())
190 }
191}
192
193#[derive(Copy, Clone, Debug, PartialEq)]
194pub enum Status {
195 SignedOut,
196 UpgradeRequired,
197 Authenticating,
198 Connecting,
199 ConnectionError,
200 Connected {
201 peer_id: PeerId,
202 connection_id: ConnectionId,
203 },
204 ConnectionLost,
205 Reauthenticating,
206 Reconnecting,
207 ReconnectionError {
208 next_reconnection: Instant,
209 },
210}
211
212impl Status {
213 pub fn is_connected(&self) -> bool {
214 matches!(self, Self::Connected { .. })
215 }
216
217 pub fn is_signed_out(&self) -> bool {
218 matches!(self, Self::SignedOut | Self::UpgradeRequired)
219 }
220}
221
222struct ClientState {
223 credentials: Option<Credentials>,
224 status: (watch::Sender<Status>, watch::Receiver<Status>),
225 entity_id_extractors: HashMap<TypeId, fn(&dyn AnyTypedEnvelope) -> u64>,
226 _reconnect_task: Option<Task<()>>,
227 reconnect_interval: Duration,
228 entities_by_type_and_remote_id: HashMap<(TypeId, u64), WeakSubscriber>,
229 models_by_message_type: HashMap<TypeId, AnyWeakHandle>,
230 entity_types_by_message_type: HashMap<TypeId, TypeId>,
231 #[allow(clippy::type_complexity)]
232 message_handlers: HashMap<
233 TypeId,
234 Arc<
235 dyn Send
236 + Sync
237 + Fn(
238 AnyHandle,
239 Box<dyn AnyTypedEnvelope>,
240 &Arc<Client>,
241 AsyncAppContext,
242 ) -> BoxFuture<'static, Result<()>>,
243 >,
244 >,
245}
246
247enum WeakSubscriber {
248 Entity { handle: AnyWeakHandle },
249 Pending(Vec<Box<dyn AnyTypedEnvelope>>),
250}
251
252#[derive(Clone, Debug)]
253pub struct Credentials {
254 pub user_id: u64,
255 pub access_token: String,
256}
257
258impl Default for ClientState {
259 fn default() -> Self {
260 Self {
261 credentials: None,
262 status: watch::channel_with(Status::SignedOut),
263 entity_id_extractors: Default::default(),
264 _reconnect_task: None,
265 reconnect_interval: Duration::from_secs(5),
266 models_by_message_type: Default::default(),
267 entities_by_type_and_remote_id: Default::default(),
268 entity_types_by_message_type: Default::default(),
269 message_handlers: Default::default(),
270 }
271 }
272}
273
274pub enum Subscription {
275 Entity {
276 client: Weak<Client>,
277 id: (TypeId, u64),
278 },
279 Message {
280 client: Weak<Client>,
281 id: TypeId,
282 },
283}
284
285impl Drop for Subscription {
286 fn drop(&mut self) {
287 match self {
288 Subscription::Entity { client, id } => {
289 if let Some(client) = client.upgrade() {
290 let mut state = client.state.write();
291 let _ = state.entities_by_type_and_remote_id.remove(id);
292 }
293 }
294 Subscription::Message { client, id } => {
295 if let Some(client) = client.upgrade() {
296 let mut state = client.state.write();
297 let _ = state.entity_types_by_message_type.remove(id);
298 let _ = state.message_handlers.remove(id);
299 }
300 }
301 }
302 }
303}
304
305pub struct PendingEntitySubscription<T: 'static> {
306 client: Arc<Client>,
307 remote_id: u64,
308 _entity_type: PhantomData<T>,
309 consumed: bool,
310}
311
312impl<T> PendingEntitySubscription<T>
313where
314 T: 'static + Send + Sync,
315{
316 pub fn set_model(mut self, model: &Handle<T>, cx: &mut AsyncAppContext) -> Subscription {
317 self.consumed = true;
318 let mut state = self.client.state.write();
319 let id = (TypeId::of::<T>(), self.remote_id);
320 let Some(WeakSubscriber::Pending(messages)) =
321 state.entities_by_type_and_remote_id.remove(&id)
322 else {
323 unreachable!()
324 };
325
326 state.entities_by_type_and_remote_id.insert(
327 id,
328 WeakSubscriber::Entity {
329 handle: model.downgrade().into(),
330 },
331 );
332 drop(state);
333 for message in messages {
334 self.client.handle_message(message, cx);
335 }
336 Subscription::Entity {
337 client: Arc::downgrade(&self.client),
338 id,
339 }
340 }
341}
342
343impl<T> Drop for PendingEntitySubscription<T>
344where
345 T: 'static,
346{
347 fn drop(&mut self) {
348 if !self.consumed {
349 let mut state = self.client.state.write();
350 if let Some(WeakSubscriber::Pending(messages)) = state
351 .entities_by_type_and_remote_id
352 .remove(&(TypeId::of::<T>(), self.remote_id))
353 {
354 for message in messages {
355 log::info!("unhandled message {}", message.payload_type_name());
356 }
357 }
358 }
359 }
360}
361
362#[derive(Copy, Clone)]
363pub struct TelemetrySettings {
364 pub diagnostics: bool,
365 pub metrics: bool,
366}
367
368#[derive(Default, Clone, Serialize, Deserialize, JsonSchema)]
369pub struct TelemetrySettingsContent {
370 pub diagnostics: Option<bool>,
371 pub metrics: Option<bool>,
372}
373
374impl settings2::Setting for TelemetrySettings {
375 const KEY: Option<&'static str> = Some("telemetry");
376
377 type FileContent = TelemetrySettingsContent;
378
379 fn load(
380 default_value: &Self::FileContent,
381 user_values: &[&Self::FileContent],
382 _: &mut AppContext,
383 ) -> Result<Self> {
384 Ok(Self {
385 diagnostics: user_values.first().and_then(|v| v.diagnostics).unwrap_or(
386 default_value
387 .diagnostics
388 .ok_or_else(Self::missing_default)?,
389 ),
390 metrics: user_values
391 .first()
392 .and_then(|v| v.metrics)
393 .unwrap_or(default_value.metrics.ok_or_else(Self::missing_default)?),
394 })
395 }
396}
397
398impl Client {
399 pub fn new(http: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
400 Arc::new(Self {
401 id: AtomicU64::new(0),
402 peer: Peer::new(0),
403 telemetry: Telemetry::new(http.clone(), cx),
404 http,
405 state: Default::default(),
406
407 #[cfg(any(test, feature = "test-support"))]
408 authenticate: Default::default(),
409 #[cfg(any(test, feature = "test-support"))]
410 establish_connection: Default::default(),
411 })
412 }
413
414 pub fn id(&self) -> u64 {
415 self.id.load(std::sync::atomic::Ordering::SeqCst)
416 }
417
418 pub fn http_client(&self) -> Arc<dyn HttpClient> {
419 self.http.clone()
420 }
421
422 pub fn set_id(&self, id: u64) -> &Self {
423 self.id.store(id, std::sync::atomic::Ordering::SeqCst);
424 self
425 }
426
427 #[cfg(any(test, feature = "test-support"))]
428 pub fn teardown(&self) {
429 let mut state = self.state.write();
430 state._reconnect_task.take();
431 state.message_handlers.clear();
432 state.models_by_message_type.clear();
433 state.entities_by_type_and_remote_id.clear();
434 state.entity_id_extractors.clear();
435 self.peer.teardown();
436 }
437
438 #[cfg(any(test, feature = "test-support"))]
439 pub fn override_authenticate<F>(&self, authenticate: F) -> &Self
440 where
441 F: 'static + Send + Sync + Fn(&AsyncAppContext) -> Task<Result<Credentials>>,
442 {
443 *self.authenticate.write() = Some(Box::new(authenticate));
444 self
445 }
446
447 #[cfg(any(test, feature = "test-support"))]
448 pub fn override_establish_connection<F>(&self, connect: F) -> &Self
449 where
450 F: 'static
451 + Send
452 + Sync
453 + Fn(&Credentials, &AsyncAppContext) -> Task<Result<Connection, EstablishConnectionError>>,
454 {
455 *self.establish_connection.write() = Some(Box::new(connect));
456 self
457 }
458
459 pub fn user_id(&self) -> Option<u64> {
460 self.state
461 .read()
462 .credentials
463 .as_ref()
464 .map(|credentials| credentials.user_id)
465 }
466
467 pub fn peer_id(&self) -> Option<PeerId> {
468 if let Status::Connected { peer_id, .. } = &*self.status().borrow() {
469 Some(*peer_id)
470 } else {
471 None
472 }
473 }
474
475 pub fn status(&self) -> watch::Receiver<Status> {
476 self.state.read().status.1.clone()
477 }
478
479 fn set_status(self: &Arc<Self>, status: Status, cx: &AsyncAppContext) {
480 log::info!("set status on client {}: {:?}", self.id(), status);
481 let mut state = self.state.write();
482 *state.status.0.borrow_mut() = status;
483
484 match status {
485 Status::Connected { .. } => {
486 state._reconnect_task = None;
487 }
488 Status::ConnectionLost => {
489 let this = self.clone();
490 let reconnect_interval = state.reconnect_interval;
491 state._reconnect_task = Some(cx.spawn(move |cx| async move {
492 #[cfg(any(test, feature = "test-support"))]
493 let mut rng = StdRng::seed_from_u64(0);
494 #[cfg(not(any(test, feature = "test-support")))]
495 let mut rng = StdRng::from_entropy();
496
497 let mut delay = INITIAL_RECONNECTION_DELAY;
498 while let Err(error) = this.authenticate_and_connect(true, &cx).await {
499 log::error!("failed to connect {}", error);
500 if matches!(*this.status().borrow(), Status::ConnectionError) {
501 this.set_status(
502 Status::ReconnectionError {
503 next_reconnection: Instant::now() + delay,
504 },
505 &cx,
506 );
507 cx.executor().timer(delay).await;
508 delay = delay
509 .mul_f32(rng.gen_range(1.0..=2.0))
510 .min(reconnect_interval);
511 } else {
512 break;
513 }
514 }
515 }));
516 }
517 Status::SignedOut | Status::UpgradeRequired => {
518 cx.update(|cx| self.telemetry.set_authenticated_user_info(None, false, cx))
519 .log_err();
520 state._reconnect_task.take();
521 }
522 _ => {}
523 }
524 }
525
526 pub fn subscribe_to_entity<T>(
527 self: &Arc<Self>,
528 remote_id: u64,
529 ) -> Result<PendingEntitySubscription<T>>
530 where
531 T: 'static + Send + Sync,
532 {
533 let id = (TypeId::of::<T>(), remote_id);
534
535 let mut state = self.state.write();
536 if state.entities_by_type_and_remote_id.contains_key(&id) {
537 return Err(anyhow!("already subscribed to entity"));
538 } else {
539 state
540 .entities_by_type_and_remote_id
541 .insert(id, WeakSubscriber::Pending(Default::default()));
542 Ok(PendingEntitySubscription {
543 client: self.clone(),
544 remote_id,
545 consumed: false,
546 _entity_type: PhantomData,
547 })
548 }
549 }
550
551 #[track_caller]
552 pub fn add_message_handler<M, E, H, F>(
553 self: &Arc<Self>,
554 entity: WeakHandle<E>,
555 handler: H,
556 ) -> Subscription
557 where
558 M: EnvelopedMessage,
559 E: 'static + Send + Sync,
560 H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
561 F: 'static + Future<Output = Result<()>> + Send,
562 {
563 let message_type_id = TypeId::of::<M>();
564
565 let mut state = self.state.write();
566 state
567 .models_by_message_type
568 .insert(message_type_id, entity.into());
569
570 let prev_handler = state.message_handlers.insert(
571 message_type_id,
572 Arc::new(move |subscriber, envelope, client, cx| {
573 let subscriber = subscriber.downcast::<E>().unwrap();
574 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
575 handler(subscriber, *envelope, client.clone(), cx).boxed()
576 }),
577 );
578 if prev_handler.is_some() {
579 let location = std::panic::Location::caller();
580 panic!(
581 "{}:{} registered handler for the same message {} twice",
582 location.file(),
583 location.line(),
584 std::any::type_name::<M>()
585 );
586 }
587
588 Subscription::Message {
589 client: Arc::downgrade(self),
590 id: message_type_id,
591 }
592 }
593
594 pub fn add_request_handler<M, E, H, F>(
595 self: &Arc<Self>,
596 model: WeakHandle<E>,
597 handler: H,
598 ) -> Subscription
599 where
600 M: RequestMessage,
601 E: 'static + Send + Sync,
602 H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
603 F: 'static + Future<Output = Result<M::Response>> + Send,
604 {
605 self.add_message_handler(model, move |handle, envelope, this, cx| {
606 Self::respond_to_request(
607 envelope.receipt(),
608 handler(handle, envelope, this.clone(), cx),
609 this,
610 )
611 })
612 }
613
614 pub fn add_model_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
615 where
616 M: EntityMessage,
617 E: 'static + Send + Sync,
618 H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
619 F: 'static + Future<Output = Result<()>> + Send,
620 {
621 self.add_entity_message_handler::<M, E, _, _>(move |subscriber, message, client, cx| {
622 handler(subscriber.downcast::<E>().unwrap(), message, client, cx)
623 })
624 }
625
626 fn add_entity_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
627 where
628 M: EntityMessage,
629 E: 'static + Send + Sync,
630 H: 'static + Send + Sync + Fn(AnyHandle, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
631 F: 'static + Future<Output = Result<()>> + Send,
632 {
633 let model_type_id = TypeId::of::<E>();
634 let message_type_id = TypeId::of::<M>();
635
636 let mut state = self.state.write();
637 state
638 .entity_types_by_message_type
639 .insert(message_type_id, model_type_id);
640 state
641 .entity_id_extractors
642 .entry(message_type_id)
643 .or_insert_with(|| {
644 |envelope| {
645 envelope
646 .as_any()
647 .downcast_ref::<TypedEnvelope<M>>()
648 .unwrap()
649 .payload
650 .remote_entity_id()
651 }
652 });
653 let prev_handler = state.message_handlers.insert(
654 message_type_id,
655 Arc::new(move |handle, envelope, client, cx| {
656 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
657 handler(handle, *envelope, client.clone(), cx).boxed()
658 }),
659 );
660 if prev_handler.is_some() {
661 panic!("registered handler for the same message twice");
662 }
663 }
664
665 pub fn add_model_request_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
666 where
667 M: EntityMessage + RequestMessage,
668 E: 'static + Send + Sync,
669 H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
670 F: 'static + Future<Output = Result<M::Response>> + Send,
671 {
672 self.add_model_message_handler(move |entity, envelope, client, cx| {
673 Self::respond_to_request::<M, _>(
674 envelope.receipt(),
675 handler(entity, envelope, client.clone(), cx),
676 client,
677 )
678 })
679 }
680
681 async fn respond_to_request<T: RequestMessage, F: Future<Output = Result<T::Response>>>(
682 receipt: Receipt<T>,
683 response: F,
684 client: Arc<Self>,
685 ) -> Result<()> {
686 match response.await {
687 Ok(response) => {
688 client.respond(receipt, response)?;
689 Ok(())
690 }
691 Err(error) => {
692 client.respond_with_error(
693 receipt,
694 proto::Error {
695 message: format!("{:?}", error),
696 },
697 )?;
698 Err(error)
699 }
700 }
701 }
702
703 pub async fn has_keychain_credentials(&self, cx: &AsyncAppContext) -> bool {
704 read_credentials_from_keychain(cx).await.is_some()
705 }
706
707 #[async_recursion]
708 pub async fn authenticate_and_connect(
709 self: &Arc<Self>,
710 try_keychain: bool,
711 cx: &AsyncAppContext,
712 ) -> anyhow::Result<()> {
713 let was_disconnected = match *self.status().borrow() {
714 Status::SignedOut => true,
715 Status::ConnectionError
716 | Status::ConnectionLost
717 | Status::Authenticating { .. }
718 | Status::Reauthenticating { .. }
719 | Status::ReconnectionError { .. } => false,
720 Status::Connected { .. } | Status::Connecting { .. } | Status::Reconnecting { .. } => {
721 return Ok(())
722 }
723 Status::UpgradeRequired => return Err(EstablishConnectionError::UpgradeRequired)?,
724 };
725
726 if was_disconnected {
727 self.set_status(Status::Authenticating, cx);
728 } else {
729 self.set_status(Status::Reauthenticating, cx)
730 }
731
732 let mut read_from_keychain = false;
733 let mut credentials = self.state.read().credentials.clone();
734 if credentials.is_none() && try_keychain {
735 credentials = read_credentials_from_keychain(cx).await;
736 read_from_keychain = credentials.is_some();
737 }
738 if credentials.is_none() {
739 let mut status_rx = self.status();
740 let _ = status_rx.next().await;
741 futures::select_biased! {
742 authenticate = self.authenticate(cx).fuse() => {
743 match authenticate {
744 Ok(creds) => credentials = Some(creds),
745 Err(err) => {
746 self.set_status(Status::ConnectionError, cx);
747 return Err(err);
748 }
749 }
750 }
751 _ = status_rx.next().fuse() => {
752 return Err(anyhow!("authentication canceled"));
753 }
754 }
755 }
756 let credentials = credentials.unwrap();
757 self.set_id(credentials.user_id);
758
759 if was_disconnected {
760 self.set_status(Status::Connecting, cx);
761 } else {
762 self.set_status(Status::Reconnecting, cx);
763 }
764
765 let mut timeout = futures::FutureExt::fuse(cx.executor().timer(CONNECTION_TIMEOUT));
766 futures::select_biased! {
767 connection = self.establish_connection(&credentials, cx).fuse() => {
768 match connection {
769 Ok(conn) => {
770 self.state.write().credentials = Some(credentials.clone());
771 if !read_from_keychain && IMPERSONATE_LOGIN.is_none() {
772 write_credentials_to_keychain(credentials, cx).log_err();
773 }
774
775 futures::select_biased! {
776 result = self.set_connection(conn, cx).fuse() => result,
777 _ = timeout => {
778 self.set_status(Status::ConnectionError, cx);
779 Err(anyhow!("timed out waiting on hello message from server"))
780 }
781 }
782 }
783 Err(EstablishConnectionError::Unauthorized) => {
784 self.state.write().credentials.take();
785 if read_from_keychain {
786 delete_credentials_from_keychain(cx).log_err();
787 self.set_status(Status::SignedOut, cx);
788 self.authenticate_and_connect(false, cx).await
789 } else {
790 self.set_status(Status::ConnectionError, cx);
791 Err(EstablishConnectionError::Unauthorized)?
792 }
793 }
794 Err(EstablishConnectionError::UpgradeRequired) => {
795 self.set_status(Status::UpgradeRequired, cx);
796 Err(EstablishConnectionError::UpgradeRequired)?
797 }
798 Err(error) => {
799 self.set_status(Status::ConnectionError, cx);
800 Err(error)?
801 }
802 }
803 }
804 _ = &mut timeout => {
805 self.set_status(Status::ConnectionError, cx);
806 Err(anyhow!("timed out trying to establish connection"))
807 }
808 }
809 }
810
811 async fn set_connection(
812 self: &Arc<Self>,
813 conn: Connection,
814 cx: &AsyncAppContext,
815 ) -> Result<()> {
816 let executor = cx.executor();
817 log::info!("add connection to peer");
818 let (connection_id, handle_io, mut incoming) = self.peer.add_connection(conn, {
819 let executor = executor.clone();
820 move |duration| executor.timer(duration)
821 });
822 let handle_io = executor.spawn(handle_io);
823
824 let peer_id = async {
825 log::info!("waiting for server hello");
826 let message = incoming
827 .next()
828 .await
829 .ok_or_else(|| anyhow!("no hello message received"))?;
830 log::info!("got server hello");
831 let hello_message_type_name = message.payload_type_name().to_string();
832 let hello = message
833 .into_any()
834 .downcast::<TypedEnvelope<proto::Hello>>()
835 .map_err(|_| {
836 anyhow!(
837 "invalid hello message received: {:?}",
838 hello_message_type_name
839 )
840 })?;
841 let peer_id = hello
842 .payload
843 .peer_id
844 .ok_or_else(|| anyhow!("invalid peer id"))?;
845 Ok(peer_id)
846 };
847
848 let peer_id = match peer_id.await {
849 Ok(peer_id) => peer_id,
850 Err(error) => {
851 self.peer.disconnect(connection_id);
852 return Err(error);
853 }
854 };
855
856 log::info!(
857 "set status to connected (connection id: {:?}, peer id: {:?})",
858 connection_id,
859 peer_id
860 );
861 self.set_status(
862 Status::Connected {
863 peer_id,
864 connection_id,
865 },
866 cx,
867 );
868
869 cx.spawn({
870 let this = self.clone();
871 |cx| {
872 async move {
873 while let Some(message) = incoming.next().await {
874 this.handle_message(message, &cx);
875 // Don't starve the main thread when receiving lots of messages at once.
876 smol::future::yield_now().await;
877 }
878 }
879 }
880 })
881 .detach();
882
883 cx.spawn({
884 let this = self.clone();
885 move |cx| async move {
886 match handle_io.await {
887 Ok(()) => {
888 if this.status().borrow().clone()
889 == (Status::Connected {
890 connection_id,
891 peer_id,
892 })
893 {
894 this.set_status(Status::SignedOut, &cx);
895 }
896 }
897 Err(err) => {
898 log::error!("connection error: {:?}", err);
899 this.set_status(Status::ConnectionLost, &cx);
900 }
901 }
902 }
903 })
904 .detach();
905
906 Ok(())
907 }
908
909 fn authenticate(self: &Arc<Self>, cx: &AsyncAppContext) -> Task<Result<Credentials>> {
910 #[cfg(any(test, feature = "test-support"))]
911 if let Some(callback) = self.authenticate.read().as_ref() {
912 return callback(cx);
913 }
914
915 self.authenticate_with_browser(cx)
916 }
917
918 fn establish_connection(
919 self: &Arc<Self>,
920 credentials: &Credentials,
921 cx: &AsyncAppContext,
922 ) -> Task<Result<Connection, EstablishConnectionError>> {
923 #[cfg(any(test, feature = "test-support"))]
924 if let Some(callback) = self.establish_connection.read().as_ref() {
925 return callback(credentials, cx);
926 }
927
928 self.establish_websocket_connection(credentials, cx)
929 }
930
931 async fn get_rpc_url(http: Arc<dyn HttpClient>, is_preview: bool) -> Result<Url> {
932 let preview_param = if is_preview { "?preview=1" } else { "" };
933 let url = format!("{}/rpc{preview_param}", *ZED_SERVER_URL);
934 let response = http.get(&url, Default::default(), false).await?;
935
936 // Normally, ZED_SERVER_URL is set to the URL of zed.dev website.
937 // The website's /rpc endpoint redirects to a collab server's /rpc endpoint,
938 // which requires authorization via an HTTP header.
939 //
940 // For testing purposes, ZED_SERVER_URL can also set to the direct URL of
941 // of a collab server. In that case, a request to the /rpc endpoint will
942 // return an 'unauthorized' response.
943 let collab_url = if response.status().is_redirection() {
944 response
945 .headers()
946 .get("Location")
947 .ok_or_else(|| anyhow!("missing location header in /rpc response"))?
948 .to_str()
949 .map_err(EstablishConnectionError::other)?
950 .to_string()
951 } else if response.status() == StatusCode::UNAUTHORIZED {
952 url
953 } else {
954 Err(anyhow!(
955 "unexpected /rpc response status {}",
956 response.status()
957 ))?
958 };
959
960 Url::parse(&collab_url).context("invalid rpc url")
961 }
962
963 fn establish_websocket_connection(
964 self: &Arc<Self>,
965 credentials: &Credentials,
966 cx: &AsyncAppContext,
967 ) -> Task<Result<Connection, EstablishConnectionError>> {
968 let use_preview_server = cx
969 .try_read_global(|channel: &ReleaseChannel, _| *channel != ReleaseChannel::Stable)
970 .unwrap_or(false);
971
972 let request = Request::builder()
973 .header(
974 "Authorization",
975 format!("{} {}", credentials.user_id, credentials.access_token),
976 )
977 .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION);
978
979 let http = self.http.clone();
980 cx.executor().spawn(async move {
981 let mut rpc_url = Self::get_rpc_url(http, use_preview_server).await?;
982 let rpc_host = rpc_url
983 .host_str()
984 .zip(rpc_url.port_or_known_default())
985 .ok_or_else(|| anyhow!("missing host in rpc url"))?;
986 let stream = smol::net::TcpStream::connect(rpc_host).await?;
987
988 log::info!("connected to rpc endpoint {}", rpc_url);
989
990 match rpc_url.scheme() {
991 "https" => {
992 rpc_url.set_scheme("wss").unwrap();
993 let request = request.uri(rpc_url.as_str()).body(())?;
994 let (stream, _) =
995 async_tungstenite::async_tls::client_async_tls(request, stream).await?;
996 Ok(Connection::new(
997 stream
998 .map_err(|error| anyhow!(error))
999 .sink_map_err(|error| anyhow!(error)),
1000 ))
1001 }
1002 "http" => {
1003 rpc_url.set_scheme("ws").unwrap();
1004 let request = request.uri(rpc_url.as_str()).body(())?;
1005 let (stream, _) = async_tungstenite::client_async(request, stream).await?;
1006 Ok(Connection::new(
1007 stream
1008 .map_err(|error| anyhow!(error))
1009 .sink_map_err(|error| anyhow!(error)),
1010 ))
1011 }
1012 _ => Err(anyhow!("invalid rpc url: {}", rpc_url))?,
1013 }
1014 })
1015 }
1016
1017 pub fn authenticate_with_browser(
1018 self: &Arc<Self>,
1019 cx: &AsyncAppContext,
1020 ) -> Task<Result<Credentials>> {
1021 let http = self.http.clone();
1022 cx.spawn(|cx| async move {
1023 // Generate a pair of asymmetric encryption keys. The public key will be used by the
1024 // zed server to encrypt the user's access token, so that it can'be intercepted by
1025 // any other app running on the user's device.
1026 let (public_key, private_key) =
1027 rpc::auth::keypair().expect("failed to generate keypair for auth");
1028 let public_key_string =
1029 String::try_from(public_key).expect("failed to serialize public key for auth");
1030
1031 if let Some((login, token)) = IMPERSONATE_LOGIN.as_ref().zip(ADMIN_API_TOKEN.as_ref()) {
1032 return Self::authenticate_as_admin(http, login.clone(), token.clone()).await;
1033 }
1034
1035 // Start an HTTP server to receive the redirect from Zed's sign-in page.
1036 let server = tiny_http::Server::http("127.0.0.1:0").expect("failed to find open port");
1037 let port = server.server_addr().port();
1038
1039 // Open the Zed sign-in page in the user's browser, with query parameters that indicate
1040 // that the user is signing in from a Zed app running on the same device.
1041 let mut url = format!(
1042 "{}/native_app_signin?native_app_port={}&native_app_public_key={}",
1043 *ZED_SERVER_URL, port, public_key_string
1044 );
1045
1046 if let Some(impersonate_login) = IMPERSONATE_LOGIN.as_ref() {
1047 log::info!("impersonating user @{}", impersonate_login);
1048 write!(&mut url, "&impersonate={}", impersonate_login).unwrap();
1049 }
1050
1051 cx.run_on_main(move |cx| cx.open_url(&url))?.await;
1052
1053 // Receive the HTTP request from the user's browser. Retrieve the user id and encrypted
1054 // access token from the query params.
1055 //
1056 // TODO - Avoid ever starting more than one HTTP server. Maybe switch to using a
1057 // custom URL scheme instead of this local HTTP server.
1058 let (user_id, access_token) = cx
1059 .spawn(|_| async move {
1060 for _ in 0..100 {
1061 if let Some(req) = server.recv_timeout(Duration::from_secs(1))? {
1062 let path = req.url();
1063 let mut user_id = None;
1064 let mut access_token = None;
1065 let url = Url::parse(&format!("http://example.com{}", path))
1066 .context("failed to parse login notification url")?;
1067 for (key, value) in url.query_pairs() {
1068 if key == "access_token" {
1069 access_token = Some(value.to_string());
1070 } else if key == "user_id" {
1071 user_id = Some(value.to_string());
1072 }
1073 }
1074
1075 let post_auth_url =
1076 format!("{}/native_app_signin_succeeded", *ZED_SERVER_URL);
1077 req.respond(
1078 tiny_http::Response::empty(302).with_header(
1079 tiny_http::Header::from_bytes(
1080 &b"Location"[..],
1081 post_auth_url.as_bytes(),
1082 )
1083 .unwrap(),
1084 ),
1085 )
1086 .context("failed to respond to login http request")?;
1087 return Ok((
1088 user_id.ok_or_else(|| anyhow!("missing user_id parameter"))?,
1089 access_token
1090 .ok_or_else(|| anyhow!("missing access_token parameter"))?,
1091 ));
1092 }
1093 }
1094
1095 Err(anyhow!("didn't receive login redirect"))
1096 })
1097 .await?;
1098
1099 let access_token = private_key
1100 .decrypt_string(&access_token)
1101 .context("failed to decrypt access token")?;
1102 cx.run_on_main(|cx| cx.activate(true))?.await;
1103
1104 Ok(Credentials {
1105 user_id: user_id.parse()?,
1106 access_token,
1107 })
1108 })
1109 }
1110
1111 async fn authenticate_as_admin(
1112 http: Arc<dyn HttpClient>,
1113 login: String,
1114 mut api_token: String,
1115 ) -> Result<Credentials> {
1116 #[derive(Deserialize)]
1117 struct AuthenticatedUserResponse {
1118 user: User,
1119 }
1120
1121 #[derive(Deserialize)]
1122 struct User {
1123 id: u64,
1124 }
1125
1126 // Use the collab server's admin API to retrieve the id
1127 // of the impersonated user.
1128 let mut url = Self::get_rpc_url(http.clone(), false).await?;
1129 url.set_path("/user");
1130 url.set_query(Some(&format!("github_login={login}")));
1131 let request = Request::get(url.as_str())
1132 .header("Authorization", format!("token {api_token}"))
1133 .body("".into())?;
1134
1135 let mut response = http.send(request).await?;
1136 let mut body = String::new();
1137 response.body_mut().read_to_string(&mut body).await?;
1138 if !response.status().is_success() {
1139 Err(anyhow!(
1140 "admin user request failed {} - {}",
1141 response.status().as_u16(),
1142 body,
1143 ))?;
1144 }
1145 let response: AuthenticatedUserResponse = serde_json::from_str(&body)?;
1146
1147 // Use the admin API token to authenticate as the impersonated user.
1148 api_token.insert_str(0, "ADMIN_TOKEN:");
1149 Ok(Credentials {
1150 user_id: response.user.id,
1151 access_token: api_token,
1152 })
1153 }
1154
1155 pub fn disconnect(self: &Arc<Self>, cx: &AsyncAppContext) {
1156 self.peer.teardown();
1157 self.set_status(Status::SignedOut, cx);
1158 }
1159
1160 pub fn reconnect(self: &Arc<Self>, cx: &AsyncAppContext) {
1161 self.peer.teardown();
1162 self.set_status(Status::ConnectionLost, cx);
1163 }
1164
1165 fn connection_id(&self) -> Result<ConnectionId> {
1166 if let Status::Connected { connection_id, .. } = *self.status().borrow() {
1167 Ok(connection_id)
1168 } else {
1169 Err(anyhow!("not connected"))
1170 }
1171 }
1172
1173 pub fn send<T: EnvelopedMessage>(&self, message: T) -> Result<()> {
1174 log::debug!("rpc send. client_id:{}, name:{}", self.id(), T::NAME);
1175 self.peer.send(self.connection_id()?, message)
1176 }
1177
1178 pub fn request<T: RequestMessage>(
1179 &self,
1180 request: T,
1181 ) -> impl Future<Output = Result<T::Response>> {
1182 self.request_envelope(request)
1183 .map_ok(|envelope| envelope.payload)
1184 }
1185
1186 pub fn request_envelope<T: RequestMessage>(
1187 &self,
1188 request: T,
1189 ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
1190 let client_id = self.id();
1191 log::debug!(
1192 "rpc request start. client_id:{}. name:{}",
1193 client_id,
1194 T::NAME
1195 );
1196 let response = self
1197 .connection_id()
1198 .map(|conn_id| self.peer.request_envelope(conn_id, request));
1199 async move {
1200 let response = response?.await;
1201 log::debug!(
1202 "rpc request finish. client_id:{}. name:{}",
1203 client_id,
1204 T::NAME
1205 );
1206 response
1207 }
1208 }
1209
1210 fn respond<T: RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) -> Result<()> {
1211 log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME);
1212 self.peer.respond(receipt, response)
1213 }
1214
1215 fn respond_with_error<T: RequestMessage>(
1216 &self,
1217 receipt: Receipt<T>,
1218 error: proto::Error,
1219 ) -> Result<()> {
1220 log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME);
1221 self.peer.respond_with_error(receipt, error)
1222 }
1223
1224 fn handle_message(
1225 self: &Arc<Client>,
1226 message: Box<dyn AnyTypedEnvelope>,
1227 cx: &AsyncAppContext,
1228 ) {
1229 let mut state = self.state.write();
1230 let type_name = message.payload_type_name();
1231 let payload_type_id = message.payload_type_id();
1232 let sender_id = message.original_sender_id();
1233
1234 let mut subscriber = None;
1235
1236 if let Some(handle) = state
1237 .models_by_message_type
1238 .get(&payload_type_id)
1239 .and_then(|handle| handle.upgrade())
1240 {
1241 subscriber = Some(handle);
1242 } else if let Some((extract_entity_id, entity_type_id)) =
1243 state.entity_id_extractors.get(&payload_type_id).zip(
1244 state
1245 .entity_types_by_message_type
1246 .get(&payload_type_id)
1247 .copied(),
1248 )
1249 {
1250 let entity_id = (extract_entity_id)(message.as_ref());
1251
1252 match state
1253 .entities_by_type_and_remote_id
1254 .get_mut(&(entity_type_id, entity_id))
1255 {
1256 Some(WeakSubscriber::Pending(pending)) => {
1257 pending.push(message);
1258 return;
1259 }
1260 Some(weak_subscriber @ _) => match weak_subscriber {
1261 WeakSubscriber::Entity { handle } => {
1262 subscriber = handle.upgrade();
1263 }
1264
1265 WeakSubscriber::Pending(_) => {}
1266 },
1267 _ => {}
1268 }
1269 }
1270
1271 let subscriber = if let Some(subscriber) = subscriber {
1272 subscriber
1273 } else {
1274 log::info!("unhandled message {}", type_name);
1275 self.peer.respond_with_unhandled_message(message).log_err();
1276 return;
1277 };
1278
1279 let handler = state.message_handlers.get(&payload_type_id).cloned();
1280 // Dropping the state prevents deadlocks if the handler interacts with rpc::Client.
1281 // It also ensures we don't hold the lock while yielding back to the executor, as
1282 // that might cause the executor thread driving this future to block indefinitely.
1283 drop(state);
1284
1285 if let Some(handler) = handler {
1286 let future = handler(subscriber, message, &self, cx.clone());
1287 let client_id = self.id();
1288 log::debug!(
1289 "rpc message received. client_id:{}, sender_id:{:?}, type:{}",
1290 client_id,
1291 sender_id,
1292 type_name
1293 );
1294 cx.spawn_on_main(move |_| async move {
1295 match future.await {
1296 Ok(()) => {
1297 log::debug!(
1298 "rpc message handled. client_id:{}, sender_id:{:?}, type:{}",
1299 client_id,
1300 sender_id,
1301 type_name
1302 );
1303 }
1304 Err(error) => {
1305 log::error!(
1306 "error handling message. client_id:{}, sender_id:{:?}, type:{}, error:{:?}",
1307 client_id,
1308 sender_id,
1309 type_name,
1310 error
1311 );
1312 }
1313 }
1314 })
1315 .detach();
1316 } else {
1317 log::info!("unhandled message {}", type_name);
1318 self.peer.respond_with_unhandled_message(message).log_err();
1319 }
1320 }
1321
1322 pub fn telemetry(&self) -> &Arc<Telemetry> {
1323 &self.telemetry
1324 }
1325}
1326
1327async fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option<Credentials> {
1328 if IMPERSONATE_LOGIN.is_some() {
1329 return None;
1330 }
1331
1332 let (user_id, access_token) = cx
1333 .run_on_main(|cx| cx.read_credentials(&ZED_SERVER_URL).log_err().flatten())
1334 .ok()?
1335 .await?;
1336
1337 Some(Credentials {
1338 user_id: user_id.parse().ok()?,
1339 access_token: String::from_utf8(access_token).ok()?,
1340 })
1341}
1342
1343async fn write_credentials_to_keychain(
1344 credentials: Credentials,
1345 cx: &AsyncAppContext,
1346) -> Result<()> {
1347 cx.run_on_main(move |cx| {
1348 cx.write_credentials(
1349 &ZED_SERVER_URL,
1350 &credentials.user_id.to_string(),
1351 credentials.access_token.as_bytes(),
1352 )
1353 })?
1354 .await
1355}
1356
1357async fn delete_credentials_from_keychain(cx: &AsyncAppContext) -> Result<()> {
1358 cx.run_on_main(move |cx| cx.delete_credentials(&ZED_SERVER_URL))?
1359 .await
1360}
1361
1362const WORKTREE_URL_PREFIX: &str = "zed://worktrees/";
1363
1364pub fn encode_worktree_url(id: u64, access_token: &str) -> String {
1365 format!("{}{}/{}", WORKTREE_URL_PREFIX, id, access_token)
1366}
1367
1368pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> {
1369 let path = url.trim().strip_prefix(WORKTREE_URL_PREFIX)?;
1370 let mut parts = path.split('/');
1371 let id = parts.next()?.parse::<u64>().ok()?;
1372 let access_token = parts.next()?;
1373 if access_token.is_empty() {
1374 return None;
1375 }
1376 Some((id, access_token.to_string()))
1377}
1378
1379// #[cfg(test)]
1380// mod tests {
1381// use super::*;
1382// use crate::test::FakeServer;
1383// use gpui::{executor::Deterministic, TestAppContext};
1384// use parking_lot::Mutex;
1385// use std::future;
1386// use util::http::FakeHttpClient;
1387
1388// #[gpui::test(iterations = 10)]
1389// async fn test_reconnection(cx: &mut TestAppContext) {
1390// cx.foreground().forbid_parking();
1391
1392// let user_id = 5;
1393// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
1394// let server = FakeServer::for_client(user_id, &client, cx).await;
1395// let mut status = client.status();
1396// assert!(matches!(
1397// status.next().await,
1398// Some(Status::Connected { .. })
1399// ));
1400// assert_eq!(server.auth_count(), 1);
1401
1402// server.forbid_connections();
1403// server.disconnect();
1404// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
1405
1406// server.allow_connections();
1407// cx.foreground().advance_clock(Duration::from_secs(10));
1408// while !matches!(status.next().await, Some(Status::Connected { .. })) {}
1409// assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting
1410
1411// server.forbid_connections();
1412// server.disconnect();
1413// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
1414
1415// // Clear cached credentials after authentication fails
1416// server.roll_access_token();
1417// server.allow_connections();
1418// cx.foreground().advance_clock(Duration::from_secs(10));
1419// while !matches!(status.next().await, Some(Status::Connected { .. })) {}
1420// assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
1421// }
1422
1423// #[gpui::test(iterations = 10)]
1424// async fn test_connection_timeout(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
1425// deterministic.forbid_parking();
1426
1427// let user_id = 5;
1428// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
1429// let mut status = client.status();
1430
1431// // Time out when client tries to connect.
1432// client.override_authenticate(move |cx| {
1433// cx.foreground().spawn(async move {
1434// Ok(Credentials {
1435// user_id,
1436// access_token: "token".into(),
1437// })
1438// })
1439// });
1440// client.override_establish_connection(|_, cx| {
1441// cx.foreground().spawn(async move {
1442// future::pending::<()>().await;
1443// unreachable!()
1444// })
1445// });
1446// let auth_and_connect = cx.spawn({
1447// let client = client.clone();
1448// |cx| async move { client.authenticate_and_connect(false, &cx).await }
1449// });
1450// deterministic.run_until_parked();
1451// assert!(matches!(status.next().await, Some(Status::Connecting)));
1452
1453// deterministic.advance_clock(CONNECTION_TIMEOUT);
1454// assert!(matches!(
1455// status.next().await,
1456// Some(Status::ConnectionError { .. })
1457// ));
1458// auth_and_connect.await.unwrap_err();
1459
1460// // Allow the connection to be established.
1461// let server = FakeServer::for_client(user_id, &client, cx).await;
1462// assert!(matches!(
1463// status.next().await,
1464// Some(Status::Connected { .. })
1465// ));
1466
1467// // Disconnect client.
1468// server.forbid_connections();
1469// server.disconnect();
1470// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
1471
1472// // Time out when re-establishing the connection.
1473// server.allow_connections();
1474// client.override_establish_connection(|_, cx| {
1475// cx.foreground().spawn(async move {
1476// future::pending::<()>().await;
1477// unreachable!()
1478// })
1479// });
1480// deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY);
1481// assert!(matches!(
1482// status.next().await,
1483// Some(Status::Reconnecting { .. })
1484// ));
1485
1486// deterministic.advance_clock(CONNECTION_TIMEOUT);
1487// assert!(matches!(
1488// status.next().await,
1489// Some(Status::ReconnectionError { .. })
1490// ));
1491// }
1492
1493// #[gpui::test(iterations = 10)]
1494// async fn test_authenticating_more_than_once(
1495// cx: &mut TestAppContext,
1496// deterministic: Arc<Deterministic>,
1497// ) {
1498// cx.foreground().forbid_parking();
1499
1500// let auth_count = Arc::new(Mutex::new(0));
1501// let dropped_auth_count = Arc::new(Mutex::new(0));
1502// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
1503// client.override_authenticate({
1504// let auth_count = auth_count.clone();
1505// let dropped_auth_count = dropped_auth_count.clone();
1506// move |cx| {
1507// let auth_count = auth_count.clone();
1508// let dropped_auth_count = dropped_auth_count.clone();
1509// cx.foreground().spawn(async move {
1510// *auth_count.lock() += 1;
1511// let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
1512// future::pending::<()>().await;
1513// unreachable!()
1514// })
1515// }
1516// });
1517
1518// let _authenticate = cx.spawn(|cx| {
1519// let client = client.clone();
1520// async move { client.authenticate_and_connect(false, &cx).await }
1521// });
1522// deterministic.run_until_parked();
1523// assert_eq!(*auth_count.lock(), 1);
1524// assert_eq!(*dropped_auth_count.lock(), 0);
1525
1526// let _authenticate = cx.spawn(|cx| {
1527// let client = client.clone();
1528// async move { client.authenticate_and_connect(false, &cx).await }
1529// });
1530// deterministic.run_until_parked();
1531// assert_eq!(*auth_count.lock(), 2);
1532// assert_eq!(*dropped_auth_count.lock(), 1);
1533// }
1534
1535// #[test]
1536// fn test_encode_and_decode_worktree_url() {
1537// let url = encode_worktree_url(5, "deadbeef");
1538// assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string())));
1539// assert_eq!(
1540// decode_worktree_url(&format!("\n {}\t", url)),
1541// Some((5, "deadbeef".to_string()))
1542// );
1543// assert_eq!(decode_worktree_url("not://the-right-format"), None);
1544// }
1545
1546// #[gpui::test]
1547// async fn test_subscribing_to_entity(cx: &mut TestAppContext) {
1548// cx.foreground().forbid_parking();
1549
1550// let user_id = 5;
1551// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
1552// let server = FakeServer::for_client(user_id, &client, cx).await;
1553
1554// let (done_tx1, mut done_rx1) = smol::channel::unbounded();
1555// let (done_tx2, mut done_rx2) = smol::channel::unbounded();
1556// client.add_model_message_handler(
1557// move |model: ModelHandle<Model>, _: TypedEnvelope<proto::JoinProject>, _, cx| {
1558// match model.read_with(&cx, |model, _| model.id) {
1559// 1 => done_tx1.try_send(()).unwrap(),
1560// 2 => done_tx2.try_send(()).unwrap(),
1561// _ => unreachable!(),
1562// }
1563// async { Ok(()) }
1564// },
1565// );
1566// let model1 = cx.add_model(|_| Model {
1567// id: 1,
1568// subscription: None,
1569// });
1570// let model2 = cx.add_model(|_| Model {
1571// id: 2,
1572// subscription: None,
1573// });
1574// let model3 = cx.add_model(|_| Model {
1575// id: 3,
1576// subscription: None,
1577// });
1578
1579// let _subscription1 = client
1580// .subscribe_to_entity(1)
1581// .unwrap()
1582// .set_model(&model1, &mut cx.to_async());
1583// let _subscription2 = client
1584// .subscribe_to_entity(2)
1585// .unwrap()
1586// .set_model(&model2, &mut cx.to_async());
1587// // Ensure dropping a subscription for the same entity type still allows receiving of
1588// // messages for other entity IDs of the same type.
1589// let subscription3 = client
1590// .subscribe_to_entity(3)
1591// .unwrap()
1592// .set_model(&model3, &mut cx.to_async());
1593// drop(subscription3);
1594
1595// server.send(proto::JoinProject { project_id: 1 });
1596// server.send(proto::JoinProject { project_id: 2 });
1597// done_rx1.next().await.unwrap();
1598// done_rx2.next().await.unwrap();
1599// }
1600
1601// #[gpui::test]
1602// async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) {
1603// cx.foreground().forbid_parking();
1604
1605// let user_id = 5;
1606// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
1607// let server = FakeServer::for_client(user_id, &client, cx).await;
1608
1609// let model = cx.add_model(|_| Model::default());
1610// let (done_tx1, _done_rx1) = smol::channel::unbounded();
1611// let (done_tx2, mut done_rx2) = smol::channel::unbounded();
1612// let subscription1 = client.add_message_handler(
1613// model.clone(),
1614// move |_, _: TypedEnvelope<proto::Ping>, _, _| {
1615// done_tx1.try_send(()).unwrap();
1616// async { Ok(()) }
1617// },
1618// );
1619// drop(subscription1);
1620// let _subscription2 = client.add_message_handler(
1621// model.clone(),
1622// move |_, _: TypedEnvelope<proto::Ping>, _, _| {
1623// done_tx2.try_send(()).unwrap();
1624// async { Ok(()) }
1625// },
1626// );
1627// server.send(proto::Ping {});
1628// done_rx2.next().await.unwrap();
1629// }
1630
1631// #[gpui::test]
1632// async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) {
1633// cx.foreground().forbid_parking();
1634
1635// let user_id = 5;
1636// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
1637// let server = FakeServer::for_client(user_id, &client, cx).await;
1638
1639// let model = cx.add_model(|_| Model::default());
1640// let (done_tx, mut done_rx) = smol::channel::unbounded();
1641// let subscription = client.add_message_handler(
1642// model.clone(),
1643// move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
1644// model.update(&mut cx, |model, _| model.subscription.take());
1645// done_tx.try_send(()).unwrap();
1646// async { Ok(()) }
1647// },
1648// );
1649// model.update(cx, |model, _| {
1650// model.subscription = Some(subscription);
1651// });
1652// server.send(proto::Ping {});
1653// done_rx.next().await.unwrap();
1654// }
1655
1656// #[derive(Default)]
1657// struct Model {
1658// id: usize,
1659// subscription: Option<Subscription>,
1660// }
1661
1662// impl Entity for Model {
1663// type Event = ();
1664// }
1665// }