diff --git a/Cargo.lock b/Cargo.lock index 69b17a9aee78ebd8e704d9d4fd36b5247937cff7..b5f645248104b1dd1eb3eb8970a74721c6838460 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1449,6 +1449,43 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "client2" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-recursion 0.3.2", + "async-tungstenite", + "collections", + "db", + "feature_flags", + "futures 0.3.28", + "gpui", + "gpui2", + "image", + "lazy_static", + "log", + "parking_lot 0.11.2", + "postage", + "rand 0.8.5", + "rpc", + "schemars", + "serde", + "serde_derive", + "settings", + "smol", + "sum_tree", + "sysinfo", + "tempfile", + "text", + "thiserror", + "time", + "tiny_http", + "url", + "util", + "uuid 1.4.1", +] + [[package]] name = "clock" version = "0.1.0" @@ -10379,6 +10416,7 @@ dependencies = [ "backtrace", "chrono", "cli", + "client2", "collections", "ctor", "env_logger 0.9.3", diff --git a/Cargo.toml b/Cargo.toml index 99e1b4da548caef5b45e7430bddff9228e8b783f..db7a5eb2e8dcdf6ed2e5d6d2228126bc76d8c672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/channel", "crates/cli", "crates/client", + "crates/client2", "crates/clock", "crates/collab", "crates/collab_ui", diff --git a/crates/client2/Cargo.toml b/crates/client2/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..6d2d61ec1384a5674f0960ae595a40598c127d76 --- /dev/null +++ b/crates/client2/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "client2" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +path = "src/client2.rs" +doctest = false + +[features] +test-support = ["collections/test-support", "gpui/test-support", "rpc/test-support"] + +[dependencies] +collections = { path = "../collections" } +db = { path = "../db" } +gpui2 = { path = "../gpui2" } +util = { path = "../util" } +rpc = { path = "../rpc" } +text = { path = "../text" } +settings = { path = "../settings" } +feature_flags = { path = "../feature_flags" } +sum_tree = { path = "../sum_tree" } + +anyhow.workspace = true +async-recursion = "0.3" +async-tungstenite = { version = "0.16", features = ["async-tls"] } +futures.workspace = true +image = "0.23" +lazy_static.workspace = true +log.workspace = true +parking_lot.workspace = true +postage.workspace = true +rand.workspace = true +schemars.workspace = true +serde.workspace = true +serde_derive.workspace = true +smol.workspace = true +sysinfo.workspace = true +tempfile = "3" +thiserror.workspace = true +time.workspace = true +tiny_http = "0.8" +uuid.workspace = true +url = "2.2" + +[dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } +gpui2 = { path = "../gpui2", features = ["test-support"] } +rpc = { path = "../rpc", features = ["test-support"] } +settings = { path = "../settings", features = ["test-support"] } +util = { path = "../util", features = ["test-support"] } diff --git a/crates/client2/src/client2.rs b/crates/client2/src/client2.rs new file mode 100644 index 0000000000000000000000000000000000000000..9f63d0e2bed327fd306692a0c28952ea18c854fd --- /dev/null +++ b/crates/client2/src/client2.rs @@ -0,0 +1,1723 @@ +#[cfg(any(test, feature = "test-support"))] +pub mod test; + +pub mod telemetry; +pub mod user; + +use anyhow::{anyhow, Context, Result}; +use async_recursion::async_recursion; +use async_tungstenite::tungstenite::{ + error::Error as WebsocketError, + http::{Request, StatusCode}, +}; +use futures::{ + future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, + TryStreamExt, +}; +use gpui::{ + actions, platform::AppVersion, serde_json, AnyModelHandle, AnyWeakModelHandle, + AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelHandle, Task, View, ViewContext, + WeakViewHandle, +}; +use lazy_static::lazy_static; +use parking_lot::RwLock; +use postage::watch; +use rand::prelude::*; +use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::{ + any::TypeId, + collections::HashMap, + convert::TryFrom, + fmt::Write as _, + future::Future, + marker::PhantomData, + path::PathBuf, + sync::{atomic::AtomicU64, Arc, Weak}, + time::{Duration, Instant}, +}; +use telemetry::Telemetry; +use thiserror::Error; +use url::Url; +use util::channel::ReleaseChannel; +use util::http::HttpClient; +use util::{ResultExt, TryFutureExt}; + +pub use rpc::*; +pub use telemetry::ClickhouseEvent; +pub use user::*; + +lazy_static! { + pub static ref ZED_SERVER_URL: String = + std::env::var("ZED_SERVER_URL").unwrap_or_else(|_| "https://zed.dev".to_string()); + pub static ref IMPERSONATE_LOGIN: Option = std::env::var("ZED_IMPERSONATE") + .ok() + .and_then(|s| if s.is_empty() { None } else { Some(s) }); + pub static ref ADMIN_API_TOKEN: Option = std::env::var("ZED_ADMIN_API_TOKEN") + .ok() + .and_then(|s| if s.is_empty() { None } else { Some(s) }); + pub static ref ZED_APP_VERSION: Option = std::env::var("ZED_APP_VERSION") + .ok() + .and_then(|v| v.parse().ok()); + pub static ref ZED_APP_PATH: Option = + std::env::var("ZED_APP_PATH").ok().map(PathBuf::from); + pub static ref ZED_ALWAYS_ACTIVE: bool = + std::env::var("ZED_ALWAYS_ACTIVE").map_or(false, |e| e.len() > 0); +} + +pub const ZED_SECRET_CLIENT_TOKEN: &str = "618033988749894"; +pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(100); +pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); + +actions!(client, [SignIn, SignOut, Reconnect]); + +pub fn init_settings(cx: &mut AppContext) { + settings::register::(cx); +} + +pub fn init(client: &Arc, cx: &mut AppContext) { + init_settings(cx); + + let client = Arc::downgrade(client); + cx.add_global_action({ + let client = client.clone(); + move |_: &SignIn, cx| { + if let Some(client) = client.upgrade() { + cx.spawn( + |cx| async move { client.authenticate_and_connect(true, &cx).log_err().await }, + ) + .detach(); + } + } + }); + cx.add_global_action({ + let client = client.clone(); + move |_: &SignOut, cx| { + if let Some(client) = client.upgrade() { + cx.spawn(|cx| async move { + client.disconnect(&cx); + }) + .detach(); + } + } + }); + cx.add_global_action({ + let client = client.clone(); + move |_: &Reconnect, cx| { + if let Some(client) = client.upgrade() { + cx.spawn(|cx| async move { + client.reconnect(&cx); + }) + .detach(); + } + } + }); +} + +pub struct Client { + id: AtomicU64, + peer: Arc, + http: Arc, + telemetry: Arc, + state: RwLock, + + #[allow(clippy::type_complexity)] + #[cfg(any(test, feature = "test-support"))] + authenticate: RwLock< + Option Task>>>, + >, + + #[allow(clippy::type_complexity)] + #[cfg(any(test, feature = "test-support"))] + establish_connection: RwLock< + Option< + Box< + dyn 'static + + Send + + Sync + + Fn( + &Credentials, + &AsyncAppContext, + ) -> Task>, + >, + >, + >, +} + +#[derive(Error, Debug)] +pub enum EstablishConnectionError { + #[error("upgrade required")] + UpgradeRequired, + #[error("unauthorized")] + Unauthorized, + #[error("{0}")] + Other(#[from] anyhow::Error), + #[error("{0}")] + Http(#[from] util::http::Error), + #[error("{0}")] + Io(#[from] std::io::Error), + #[error("{0}")] + Websocket(#[from] async_tungstenite::tungstenite::http::Error), +} + +impl From for EstablishConnectionError { + fn from(error: WebsocketError) -> Self { + if let WebsocketError::Http(response) = &error { + match response.status() { + StatusCode::UNAUTHORIZED => return EstablishConnectionError::Unauthorized, + StatusCode::UPGRADE_REQUIRED => return EstablishConnectionError::UpgradeRequired, + _ => {} + } + } + EstablishConnectionError::Other(error.into()) + } +} + +impl EstablishConnectionError { + pub fn other(error: impl Into + Send + Sync) -> Self { + Self::Other(error.into()) + } +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum Status { + SignedOut, + UpgradeRequired, + Authenticating, + Connecting, + ConnectionError, + Connected { + peer_id: PeerId, + connection_id: ConnectionId, + }, + ConnectionLost, + Reauthenticating, + Reconnecting, + ReconnectionError { + next_reconnection: Instant, + }, +} + +impl Status { + pub fn is_connected(&self) -> bool { + matches!(self, Self::Connected { .. }) + } + + pub fn is_signed_out(&self) -> bool { + matches!(self, Self::SignedOut | Self::UpgradeRequired) + } +} + +struct ClientState { + credentials: Option, + status: (watch::Sender, watch::Receiver), + entity_id_extractors: HashMap u64>, + _reconnect_task: Option>, + reconnect_interval: Duration, + entities_by_type_and_remote_id: HashMap<(TypeId, u64), WeakSubscriber>, + models_by_message_type: HashMap, + entity_types_by_message_type: HashMap, + #[allow(clippy::type_complexity)] + message_handlers: HashMap< + TypeId, + Arc< + dyn Send + + Sync + + Fn( + Subscriber, + Box, + &Arc, + AsyncAppContext, + ) -> LocalBoxFuture<'static, Result<()>>, + >, + >, +} + +enum WeakSubscriber { + Model(AnyWeakModelHandle), + View(AnyWeakViewHandle), + Pending(Vec>), +} + +enum Subscriber { + Model(AnyModelHandle), + View(AnyWeakViewHandle), +} + +#[derive(Clone, Debug)] +pub struct Credentials { + pub user_id: u64, + pub access_token: String, +} + +impl Default for ClientState { + fn default() -> Self { + Self { + credentials: None, + status: watch::channel_with(Status::SignedOut), + entity_id_extractors: Default::default(), + _reconnect_task: None, + reconnect_interval: Duration::from_secs(5), + models_by_message_type: Default::default(), + entities_by_type_and_remote_id: Default::default(), + entity_types_by_message_type: Default::default(), + message_handlers: Default::default(), + } + } +} + +pub enum Subscription { + Entity { + client: Weak, + id: (TypeId, u64), + }, + Message { + client: Weak, + id: TypeId, + }, +} + +impl Drop for Subscription { + fn drop(&mut self) { + match self { + Subscription::Entity { client, id } => { + if let Some(client) = client.upgrade() { + let mut state = client.state.write(); + let _ = state.entities_by_type_and_remote_id.remove(id); + } + } + Subscription::Message { client, id } => { + if let Some(client) = client.upgrade() { + let mut state = client.state.write(); + let _ = state.entity_types_by_message_type.remove(id); + let _ = state.message_handlers.remove(id); + } + } + } + } +} + +pub struct PendingEntitySubscription { + client: Arc, + remote_id: u64, + _entity_type: PhantomData, + consumed: bool, +} + +impl PendingEntitySubscription { + pub fn set_model(mut self, model: &ModelHandle, cx: &mut AsyncAppContext) -> Subscription { + self.consumed = true; + let mut state = self.client.state.write(); + let id = (TypeId::of::(), self.remote_id); + let Some(WeakSubscriber::Pending(messages)) = + state.entities_by_type_and_remote_id.remove(&id) + else { + unreachable!() + }; + + state + .entities_by_type_and_remote_id + .insert(id, WeakSubscriber::Model(model.downgrade().into_any())); + drop(state); + for message in messages { + self.client.handle_message(message, cx); + } + Subscription::Entity { + client: Arc::downgrade(&self.client), + id, + } + } +} + +impl Drop for PendingEntitySubscription { + fn drop(&mut self) { + if !self.consumed { + let mut state = self.client.state.write(); + if let Some(WeakSubscriber::Pending(messages)) = state + .entities_by_type_and_remote_id + .remove(&(TypeId::of::(), self.remote_id)) + { + for message in messages { + log::info!("unhandled message {}", message.payload_type_name()); + } + } + } + } +} + +#[derive(Copy, Clone)] +pub struct TelemetrySettings { + pub diagnostics: bool, + pub metrics: bool, +} + +#[derive(Default, Clone, Serialize, Deserialize, JsonSchema)] +pub struct TelemetrySettingsContent { + pub diagnostics: Option, + pub metrics: Option, +} + +impl settings::Setting for TelemetrySettings { + const KEY: Option<&'static str> = Some("telemetry"); + + type FileContent = TelemetrySettingsContent; + + fn load( + default_value: &Self::FileContent, + user_values: &[&Self::FileContent], + _: &AppContext, + ) -> Result { + Ok(Self { + diagnostics: user_values.first().and_then(|v| v.diagnostics).unwrap_or( + default_value + .diagnostics + .ok_or_else(Self::missing_default)?, + ), + metrics: user_values + .first() + .and_then(|v| v.metrics) + .unwrap_or(default_value.metrics.ok_or_else(Self::missing_default)?), + }) + } +} + +impl Client { + pub fn new(http: Arc, cx: &AppContext) -> Arc { + Arc::new(Self { + id: AtomicU64::new(0), + peer: Peer::new(0), + telemetry: Telemetry::new(http.clone(), cx), + http, + state: Default::default(), + + #[cfg(any(test, feature = "test-support"))] + authenticate: Default::default(), + #[cfg(any(test, feature = "test-support"))] + establish_connection: Default::default(), + }) + } + + pub fn id(&self) -> u64 { + self.id.load(std::sync::atomic::Ordering::SeqCst) + } + + pub fn http_client(&self) -> Arc { + self.http.clone() + } + + pub fn set_id(&self, id: u64) -> &Self { + self.id.store(id, std::sync::atomic::Ordering::SeqCst); + self + } + + #[cfg(any(test, feature = "test-support"))] + pub fn teardown(&self) { + let mut state = self.state.write(); + state._reconnect_task.take(); + state.message_handlers.clear(); + state.models_by_message_type.clear(); + state.entities_by_type_and_remote_id.clear(); + state.entity_id_extractors.clear(); + self.peer.teardown(); + } + + #[cfg(any(test, feature = "test-support"))] + pub fn override_authenticate(&self, authenticate: F) -> &Self + where + F: 'static + Send + Sync + Fn(&AsyncAppContext) -> Task>, + { + *self.authenticate.write() = Some(Box::new(authenticate)); + self + } + + #[cfg(any(test, feature = "test-support"))] + pub fn override_establish_connection(&self, connect: F) -> &Self + where + F: 'static + + Send + + Sync + + Fn(&Credentials, &AsyncAppContext) -> Task>, + { + *self.establish_connection.write() = Some(Box::new(connect)); + self + } + + pub fn user_id(&self) -> Option { + self.state + .read() + .credentials + .as_ref() + .map(|credentials| credentials.user_id) + } + + pub fn peer_id(&self) -> Option { + if let Status::Connected { peer_id, .. } = &*self.status().borrow() { + Some(*peer_id) + } else { + None + } + } + + pub fn status(&self) -> watch::Receiver { + self.state.read().status.1.clone() + } + + fn set_status(self: &Arc, status: Status, cx: &AsyncAppContext) { + log::info!("set status on client {}: {:?}", self.id(), status); + let mut state = self.state.write(); + *state.status.0.borrow_mut() = status; + + match status { + Status::Connected { .. } => { + state._reconnect_task = None; + } + Status::ConnectionLost => { + let this = self.clone(); + let reconnect_interval = state.reconnect_interval; + state._reconnect_task = Some(cx.spawn(|cx| async move { + #[cfg(any(test, feature = "test-support"))] + let mut rng = StdRng::seed_from_u64(0); + #[cfg(not(any(test, feature = "test-support")))] + let mut rng = StdRng::from_entropy(); + + let mut delay = INITIAL_RECONNECTION_DELAY; + while let Err(error) = this.authenticate_and_connect(true, &cx).await { + log::error!("failed to connect {}", error); + if matches!(*this.status().borrow(), Status::ConnectionError) { + this.set_status( + Status::ReconnectionError { + next_reconnection: Instant::now() + delay, + }, + &cx, + ); + cx.background().timer(delay).await; + delay = delay + .mul_f32(rng.gen_range(1.0..=2.0)) + .min(reconnect_interval); + } else { + break; + } + } + })); + } + Status::SignedOut | Status::UpgradeRequired => { + cx.read(|cx| self.telemetry.set_authenticated_user_info(None, false, cx)); + state._reconnect_task.take(); + } + _ => {} + } + } + + pub fn add_view_for_remote_entity( + self: &Arc, + remote_id: u64, + cx: &mut ViewContext, + ) -> Subscription { + let id = (TypeId::of::(), remote_id); + self.state + .write() + .entities_by_type_and_remote_id + .insert(id, WeakSubscriber::View(cx.weak_handle().into_any())); + Subscription::Entity { + client: Arc::downgrade(self), + id, + } + } + + pub fn subscribe_to_entity( + self: &Arc, + remote_id: u64, + ) -> Result> { + let id = (TypeId::of::(), remote_id); + + let mut state = self.state.write(); + if state.entities_by_type_and_remote_id.contains_key(&id) { + return Err(anyhow!("already subscribed to entity")); + } else { + state + .entities_by_type_and_remote_id + .insert(id, WeakSubscriber::Pending(Default::default())); + Ok(PendingEntitySubscription { + client: self.clone(), + remote_id, + consumed: false, + _entity_type: PhantomData, + }) + } + } + + #[track_caller] + pub fn add_message_handler( + self: &Arc, + model: ModelHandle, + handler: H, + ) -> Subscription + where + M: EnvelopedMessage, + E: Entity, + H: 'static + + Send + + Sync + + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + let message_type_id = TypeId::of::(); + + let mut state = self.state.write(); + state + .models_by_message_type + .insert(message_type_id, model.downgrade().into_any()); + + let prev_handler = state.message_handlers.insert( + message_type_id, + Arc::new(move |handle, envelope, client, cx| { + let handle = if let Subscriber::Model(handle) = handle { + handle + } else { + unreachable!(); + }; + let model = handle.downcast::().unwrap(); + let envelope = envelope.into_any().downcast::>().unwrap(); + handler(model, *envelope, client.clone(), cx).boxed_local() + }), + ); + if prev_handler.is_some() { + let location = std::panic::Location::caller(); + panic!( + "{}:{} registered handler for the same message {} twice", + location.file(), + location.line(), + std::any::type_name::() + ); + } + + Subscription::Message { + client: Arc::downgrade(self), + id: message_type_id, + } + } + + pub fn add_request_handler( + self: &Arc, + model: ModelHandle, + handler: H, + ) -> Subscription + where + M: RequestMessage, + E: Entity, + H: 'static + + Send + + Sync + + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + self.add_message_handler(model, move |handle, envelope, this, cx| { + Self::respond_to_request( + envelope.receipt(), + handler(handle, envelope, this.clone(), cx), + this, + ) + }) + } + + pub fn add_view_message_handler(self: &Arc, handler: H) + where + M: EntityMessage, + E: View, + H: 'static + + Send + + Sync + + Fn(WeakViewHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + self.add_entity_message_handler::(move |handle, message, client, cx| { + if let Subscriber::View(handle) = handle { + handler(handle.downcast::().unwrap(), message, client, cx) + } else { + unreachable!(); + } + }) + } + + pub fn add_model_message_handler(self: &Arc, handler: H) + where + M: EntityMessage, + E: Entity, + H: 'static + + Send + + Sync + + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + self.add_entity_message_handler::(move |handle, message, client, cx| { + if let Subscriber::Model(handle) = handle { + handler(handle.downcast::().unwrap(), message, client, cx) + } else { + unreachable!(); + } + }) + } + + fn add_entity_message_handler(self: &Arc, handler: H) + where + M: EntityMessage, + E: Entity, + H: 'static + + Send + + Sync + + Fn(Subscriber, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + let model_type_id = TypeId::of::(); + let message_type_id = TypeId::of::(); + + let mut state = self.state.write(); + state + .entity_types_by_message_type + .insert(message_type_id, model_type_id); + state + .entity_id_extractors + .entry(message_type_id) + .or_insert_with(|| { + |envelope| { + envelope + .as_any() + .downcast_ref::>() + .unwrap() + .payload + .remote_entity_id() + } + }); + let prev_handler = state.message_handlers.insert( + message_type_id, + Arc::new(move |handle, envelope, client, cx| { + let envelope = envelope.into_any().downcast::>().unwrap(); + handler(handle, *envelope, client.clone(), cx).boxed_local() + }), + ); + if prev_handler.is_some() { + panic!("registered handler for the same message twice"); + } + } + + pub fn add_model_request_handler(self: &Arc, handler: H) + where + M: EntityMessage + RequestMessage, + E: Entity, + H: 'static + + Send + + Sync + + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + self.add_model_message_handler(move |entity, envelope, client, cx| { + Self::respond_to_request::( + envelope.receipt(), + handler(entity, envelope, client.clone(), cx), + client, + ) + }) + } + + pub fn add_view_request_handler(self: &Arc, handler: H) + where + M: EntityMessage + RequestMessage, + E: View, + H: 'static + + Send + + Sync + + Fn(WeakViewHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + F: 'static + Future>, + { + self.add_view_message_handler(move |entity, envelope, client, cx| { + Self::respond_to_request::( + envelope.receipt(), + handler(entity, envelope, client.clone(), cx), + client, + ) + }) + } + + async fn respond_to_request>>( + receipt: Receipt, + response: F, + client: Arc, + ) -> Result<()> { + match response.await { + Ok(response) => { + client.respond(receipt, response)?; + Ok(()) + } + Err(error) => { + client.respond_with_error( + receipt, + proto::Error { + message: format!("{:?}", error), + }, + )?; + Err(error) + } + } + } + + pub fn has_keychain_credentials(&self, cx: &AsyncAppContext) -> bool { + read_credentials_from_keychain(cx).is_some() + } + + #[async_recursion(?Send)] + pub async fn authenticate_and_connect( + self: &Arc, + try_keychain: bool, + cx: &AsyncAppContext, + ) -> anyhow::Result<()> { + let was_disconnected = match *self.status().borrow() { + Status::SignedOut => true, + Status::ConnectionError + | Status::ConnectionLost + | Status::Authenticating { .. } + | Status::Reauthenticating { .. } + | Status::ReconnectionError { .. } => false, + Status::Connected { .. } | Status::Connecting { .. } | Status::Reconnecting { .. } => { + return Ok(()) + } + Status::UpgradeRequired => return Err(EstablishConnectionError::UpgradeRequired)?, + }; + + if was_disconnected { + self.set_status(Status::Authenticating, cx); + } else { + self.set_status(Status::Reauthenticating, cx) + } + + let mut read_from_keychain = false; + let mut credentials = self.state.read().credentials.clone(); + if credentials.is_none() && try_keychain { + credentials = read_credentials_from_keychain(cx); + read_from_keychain = credentials.is_some(); + } + if credentials.is_none() { + let mut status_rx = self.status(); + let _ = status_rx.next().await; + futures::select_biased! { + authenticate = self.authenticate(cx).fuse() => { + match authenticate { + Ok(creds) => credentials = Some(creds), + Err(err) => { + self.set_status(Status::ConnectionError, cx); + return Err(err); + } + } + } + _ = status_rx.next().fuse() => { + return Err(anyhow!("authentication canceled")); + } + } + } + let credentials = credentials.unwrap(); + self.set_id(credentials.user_id); + + if was_disconnected { + self.set_status(Status::Connecting, cx); + } else { + self.set_status(Status::Reconnecting, cx); + } + + let mut timeout = cx.background().timer(CONNECTION_TIMEOUT).fuse(); + futures::select_biased! { + connection = self.establish_connection(&credentials, cx).fuse() => { + match connection { + Ok(conn) => { + self.state.write().credentials = Some(credentials.clone()); + if !read_from_keychain && IMPERSONATE_LOGIN.is_none() { + write_credentials_to_keychain(&credentials, cx).log_err(); + } + + futures::select_biased! { + result = self.set_connection(conn, cx).fuse() => result, + _ = timeout => { + self.set_status(Status::ConnectionError, cx); + Err(anyhow!("timed out waiting on hello message from server")) + } + } + } + Err(EstablishConnectionError::Unauthorized) => { + self.state.write().credentials.take(); + if read_from_keychain { + cx.platform().delete_credentials(&ZED_SERVER_URL).log_err(); + self.set_status(Status::SignedOut, cx); + self.authenticate_and_connect(false, cx).await + } else { + self.set_status(Status::ConnectionError, cx); + Err(EstablishConnectionError::Unauthorized)? + } + } + Err(EstablishConnectionError::UpgradeRequired) => { + self.set_status(Status::UpgradeRequired, cx); + Err(EstablishConnectionError::UpgradeRequired)? + } + Err(error) => { + self.set_status(Status::ConnectionError, cx); + Err(error)? + } + } + } + _ = &mut timeout => { + self.set_status(Status::ConnectionError, cx); + Err(anyhow!("timed out trying to establish connection")) + } + } + } + + async fn set_connection( + self: &Arc, + conn: Connection, + cx: &AsyncAppContext, + ) -> Result<()> { + let executor = cx.background(); + log::info!("add connection to peer"); + let (connection_id, handle_io, mut incoming) = self + .peer + .add_connection(conn, move |duration| executor.timer(duration)); + let handle_io = cx.background().spawn(handle_io); + + let peer_id = async { + log::info!("waiting for server hello"); + let message = incoming + .next() + .await + .ok_or_else(|| anyhow!("no hello message received"))?; + log::info!("got server hello"); + let hello_message_type_name = message.payload_type_name().to_string(); + let hello = message + .into_any() + .downcast::>() + .map_err(|_| { + anyhow!( + "invalid hello message received: {:?}", + hello_message_type_name + ) + })?; + let peer_id = hello + .payload + .peer_id + .ok_or_else(|| anyhow!("invalid peer id"))?; + Ok(peer_id) + }; + + let peer_id = match peer_id.await { + Ok(peer_id) => peer_id, + Err(error) => { + self.peer.disconnect(connection_id); + return Err(error); + } + }; + + log::info!( + "set status to connected (connection id: {:?}, peer id: {:?})", + connection_id, + peer_id + ); + self.set_status( + Status::Connected { + peer_id, + connection_id, + }, + cx, + ); + cx.foreground() + .spawn({ + let cx = cx.clone(); + let this = self.clone(); + async move { + while let Some(message) = incoming.next().await { + this.handle_message(message, &cx); + // Don't starve the main thread when receiving lots of messages at once. + smol::future::yield_now().await; + } + } + }) + .detach(); + + let this = self.clone(); + let cx = cx.clone(); + cx.foreground() + .spawn(async move { + match handle_io.await { + Ok(()) => { + if this.status().borrow().clone() + == (Status::Connected { + connection_id, + peer_id, + }) + { + this.set_status(Status::SignedOut, &cx); + } + } + Err(err) => { + log::error!("connection error: {:?}", err); + this.set_status(Status::ConnectionLost, &cx); + } + } + }) + .detach(); + + Ok(()) + } + + fn authenticate(self: &Arc, cx: &AsyncAppContext) -> Task> { + #[cfg(any(test, feature = "test-support"))] + if let Some(callback) = self.authenticate.read().as_ref() { + return callback(cx); + } + + self.authenticate_with_browser(cx) + } + + fn establish_connection( + self: &Arc, + credentials: &Credentials, + cx: &AsyncAppContext, + ) -> Task> { + #[cfg(any(test, feature = "test-support"))] + if let Some(callback) = self.establish_connection.read().as_ref() { + return callback(credentials, cx); + } + + self.establish_websocket_connection(credentials, cx) + } + + async fn get_rpc_url(http: Arc, is_preview: bool) -> Result { + let preview_param = if is_preview { "?preview=1" } else { "" }; + let url = format!("{}/rpc{preview_param}", *ZED_SERVER_URL); + let response = http.get(&url, Default::default(), false).await?; + + // Normally, ZED_SERVER_URL is set to the URL of zed.dev website. + // The website's /rpc endpoint redirects to a collab server's /rpc endpoint, + // which requires authorization via an HTTP header. + // + // For testing purposes, ZED_SERVER_URL can also set to the direct URL of + // of a collab server. In that case, a request to the /rpc endpoint will + // return an 'unauthorized' response. + let collab_url = if response.status().is_redirection() { + response + .headers() + .get("Location") + .ok_or_else(|| anyhow!("missing location header in /rpc response"))? + .to_str() + .map_err(EstablishConnectionError::other)? + .to_string() + } else if response.status() == StatusCode::UNAUTHORIZED { + url + } else { + Err(anyhow!( + "unexpected /rpc response status {}", + response.status() + ))? + }; + + Url::parse(&collab_url).context("invalid rpc url") + } + + fn establish_websocket_connection( + self: &Arc, + credentials: &Credentials, + cx: &AsyncAppContext, + ) -> Task> { + let use_preview_server = cx.read(|cx| { + if cx.has_global::() { + *cx.global::() != ReleaseChannel::Stable + } else { + false + } + }); + + let request = Request::builder() + .header( + "Authorization", + format!("{} {}", credentials.user_id, credentials.access_token), + ) + .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION); + + let http = self.http.clone(); + cx.background().spawn(async move { + let mut rpc_url = Self::get_rpc_url(http, use_preview_server).await?; + let rpc_host = rpc_url + .host_str() + .zip(rpc_url.port_or_known_default()) + .ok_or_else(|| anyhow!("missing host in rpc url"))?; + let stream = smol::net::TcpStream::connect(rpc_host).await?; + + log::info!("connected to rpc endpoint {}", rpc_url); + + match rpc_url.scheme() { + "https" => { + rpc_url.set_scheme("wss").unwrap(); + let request = request.uri(rpc_url.as_str()).body(())?; + let (stream, _) = + async_tungstenite::async_tls::client_async_tls(request, stream).await?; + Ok(Connection::new( + stream + .map_err(|error| anyhow!(error)) + .sink_map_err(|error| anyhow!(error)), + )) + } + "http" => { + rpc_url.set_scheme("ws").unwrap(); + let request = request.uri(rpc_url.as_str()).body(())?; + let (stream, _) = async_tungstenite::client_async(request, stream).await?; + Ok(Connection::new( + stream + .map_err(|error| anyhow!(error)) + .sink_map_err(|error| anyhow!(error)), + )) + } + _ => Err(anyhow!("invalid rpc url: {}", rpc_url))?, + } + }) + } + + pub fn authenticate_with_browser( + self: &Arc, + cx: &AsyncAppContext, + ) -> Task> { + let platform = cx.platform(); + let executor = cx.background(); + let http = self.http.clone(); + + executor.clone().spawn(async move { + // Generate a pair of asymmetric encryption keys. The public key will be used by the + // zed server to encrypt the user's access token, so that it can'be intercepted by + // any other app running on the user's device. + let (public_key, private_key) = + rpc::auth::keypair().expect("failed to generate keypair for auth"); + let public_key_string = + String::try_from(public_key).expect("failed to serialize public key for auth"); + + if let Some((login, token)) = IMPERSONATE_LOGIN.as_ref().zip(ADMIN_API_TOKEN.as_ref()) { + return Self::authenticate_as_admin(http, login.clone(), token.clone()).await; + } + + // Start an HTTP server to receive the redirect from Zed's sign-in page. + let server = tiny_http::Server::http("127.0.0.1:0").expect("failed to find open port"); + let port = server.server_addr().port(); + + // Open the Zed sign-in page in the user's browser, with query parameters that indicate + // that the user is signing in from a Zed app running on the same device. + let mut url = format!( + "{}/native_app_signin?native_app_port={}&native_app_public_key={}", + *ZED_SERVER_URL, port, public_key_string + ); + + if let Some(impersonate_login) = IMPERSONATE_LOGIN.as_ref() { + log::info!("impersonating user @{}", impersonate_login); + write!(&mut url, "&impersonate={}", impersonate_login).unwrap(); + } + + platform.open_url(&url); + + // Receive the HTTP request from the user's browser. Retrieve the user id and encrypted + // access token from the query params. + // + // TODO - Avoid ever starting more than one HTTP server. Maybe switch to using a + // custom URL scheme instead of this local HTTP server. + let (user_id, access_token) = executor + .spawn(async move { + for _ in 0..100 { + if let Some(req) = server.recv_timeout(Duration::from_secs(1))? { + let path = req.url(); + let mut user_id = None; + let mut access_token = None; + let url = Url::parse(&format!("http://example.com{}", path)) + .context("failed to parse login notification url")?; + for (key, value) in url.query_pairs() { + if key == "access_token" { + access_token = Some(value.to_string()); + } else if key == "user_id" { + user_id = Some(value.to_string()); + } + } + + let post_auth_url = + format!("{}/native_app_signin_succeeded", *ZED_SERVER_URL); + req.respond( + tiny_http::Response::empty(302).with_header( + tiny_http::Header::from_bytes( + &b"Location"[..], + post_auth_url.as_bytes(), + ) + .unwrap(), + ), + ) + .context("failed to respond to login http request")?; + return Ok(( + user_id.ok_or_else(|| anyhow!("missing user_id parameter"))?, + access_token + .ok_or_else(|| anyhow!("missing access_token parameter"))?, + )); + } + } + + Err(anyhow!("didn't receive login redirect")) + }) + .await?; + + let access_token = private_key + .decrypt_string(&access_token) + .context("failed to decrypt access token")?; + platform.activate(true); + + Ok(Credentials { + user_id: user_id.parse()?, + access_token, + }) + }) + } + + async fn authenticate_as_admin( + http: Arc, + login: String, + mut api_token: String, + ) -> Result { + #[derive(Deserialize)] + struct AuthenticatedUserResponse { + user: User, + } + + #[derive(Deserialize)] + struct User { + id: u64, + } + + // Use the collab server's admin API to retrieve the id + // of the impersonated user. + let mut url = Self::get_rpc_url(http.clone(), false).await?; + url.set_path("/user"); + url.set_query(Some(&format!("github_login={login}"))); + let request = Request::get(url.as_str()) + .header("Authorization", format!("token {api_token}")) + .body("".into())?; + + let mut response = http.send(request).await?; + let mut body = String::new(); + response.body_mut().read_to_string(&mut body).await?; + if !response.status().is_success() { + Err(anyhow!( + "admin user request failed {} - {}", + response.status().as_u16(), + body, + ))?; + } + let response: AuthenticatedUserResponse = serde_json::from_str(&body)?; + + // Use the admin API token to authenticate as the impersonated user. + api_token.insert_str(0, "ADMIN_TOKEN:"); + Ok(Credentials { + user_id: response.user.id, + access_token: api_token, + }) + } + + pub fn disconnect(self: &Arc, cx: &AsyncAppContext) { + self.peer.teardown(); + self.set_status(Status::SignedOut, cx); + } + + pub fn reconnect(self: &Arc, cx: &AsyncAppContext) { + self.peer.teardown(); + self.set_status(Status::ConnectionLost, cx); + } + + fn connection_id(&self) -> Result { + if let Status::Connected { connection_id, .. } = *self.status().borrow() { + Ok(connection_id) + } else { + Err(anyhow!("not connected")) + } + } + + pub fn send(&self, message: T) -> Result<()> { + log::debug!("rpc send. client_id:{}, name:{}", self.id(), T::NAME); + self.peer.send(self.connection_id()?, message) + } + + pub fn request( + &self, + request: T, + ) -> impl Future> { + self.request_envelope(request) + .map_ok(|envelope| envelope.payload) + } + + pub fn request_envelope( + &self, + request: T, + ) -> impl Future>> { + let client_id = self.id(); + log::debug!( + "rpc request start. client_id:{}. name:{}", + client_id, + T::NAME + ); + let response = self + .connection_id() + .map(|conn_id| self.peer.request_envelope(conn_id, request)); + async move { + let response = response?.await; + log::debug!( + "rpc request finish. client_id:{}. name:{}", + client_id, + T::NAME + ); + response + } + } + + fn respond(&self, receipt: Receipt, response: T::Response) -> Result<()> { + log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME); + self.peer.respond(receipt, response) + } + + fn respond_with_error( + &self, + receipt: Receipt, + error: proto::Error, + ) -> Result<()> { + log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME); + self.peer.respond_with_error(receipt, error) + } + + fn handle_message( + self: &Arc, + message: Box, + cx: &AsyncAppContext, + ) { + let mut state = self.state.write(); + let type_name = message.payload_type_name(); + let payload_type_id = message.payload_type_id(); + let sender_id = message.original_sender_id(); + + let mut subscriber = None; + + if let Some(message_model) = state + .models_by_message_type + .get(&payload_type_id) + .and_then(|model| model.upgrade(cx)) + { + subscriber = Some(Subscriber::Model(message_model)); + } else if let Some((extract_entity_id, entity_type_id)) = + state.entity_id_extractors.get(&payload_type_id).zip( + state + .entity_types_by_message_type + .get(&payload_type_id) + .copied(), + ) + { + let entity_id = (extract_entity_id)(message.as_ref()); + + match state + .entities_by_type_and_remote_id + .get_mut(&(entity_type_id, entity_id)) + { + Some(WeakSubscriber::Pending(pending)) => { + pending.push(message); + return; + } + Some(weak_subscriber @ _) => match weak_subscriber { + WeakSubscriber::Model(handle) => { + subscriber = handle.upgrade(cx).map(Subscriber::Model); + } + WeakSubscriber::View(handle) => { + subscriber = Some(Subscriber::View(handle.clone())); + } + WeakSubscriber::Pending(_) => {} + }, + _ => {} + } + } + + let subscriber = if let Some(subscriber) = subscriber { + subscriber + } else { + log::info!("unhandled message {}", type_name); + self.peer.respond_with_unhandled_message(message).log_err(); + return; + }; + + let handler = state.message_handlers.get(&payload_type_id).cloned(); + // Dropping the state prevents deadlocks if the handler interacts with rpc::Client. + // It also ensures we don't hold the lock while yielding back to the executor, as + // that might cause the executor thread driving this future to block indefinitely. + drop(state); + + if let Some(handler) = handler { + let future = handler(subscriber, message, &self, cx.clone()); + let client_id = self.id(); + log::debug!( + "rpc message received. client_id:{}, sender_id:{:?}, type:{}", + client_id, + sender_id, + type_name + ); + cx.foreground() + .spawn(async move { + match future.await { + Ok(()) => { + log::debug!( + "rpc message handled. client_id:{}, sender_id:{:?}, type:{}", + client_id, + sender_id, + type_name + ); + } + Err(error) => { + log::error!( + "error handling message. client_id:{}, sender_id:{:?}, type:{}, error:{:?}", + client_id, + sender_id, + type_name, + error + ); + } + } + }) + .detach(); + } else { + log::info!("unhandled message {}", type_name); + self.peer.respond_with_unhandled_message(message).log_err(); + } + } + + pub fn telemetry(&self) -> &Arc { + &self.telemetry + } +} + +fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option { + if IMPERSONATE_LOGIN.is_some() { + return None; + } + + let (user_id, access_token) = cx + .platform() + .read_credentials(&ZED_SERVER_URL) + .log_err() + .flatten()?; + Some(Credentials { + user_id: user_id.parse().ok()?, + access_token: String::from_utf8(access_token).ok()?, + }) +} + +fn write_credentials_to_keychain(credentials: &Credentials, cx: &AsyncAppContext) -> Result<()> { + cx.platform().write_credentials( + &ZED_SERVER_URL, + &credentials.user_id.to_string(), + credentials.access_token.as_bytes(), + ) +} + +const WORKTREE_URL_PREFIX: &str = "zed://worktrees/"; + +pub fn encode_worktree_url(id: u64, access_token: &str) -> String { + format!("{}{}/{}", WORKTREE_URL_PREFIX, id, access_token) +} + +pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> { + let path = url.trim().strip_prefix(WORKTREE_URL_PREFIX)?; + let mut parts = path.split('/'); + let id = parts.next()?.parse::().ok()?; + let access_token = parts.next()?; + if access_token.is_empty() { + return None; + } + Some((id, access_token.to_string())) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::FakeServer; + use gpui::{executor::Deterministic, TestAppContext}; + use parking_lot::Mutex; + use std::future; + use util::http::FakeHttpClient; + + #[gpui::test(iterations = 10)] + async fn test_reconnection(cx: &mut TestAppContext) { + cx.foreground().forbid_parking(); + + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + let mut status = client.status(); + assert!(matches!( + status.next().await, + Some(Status::Connected { .. }) + )); + assert_eq!(server.auth_count(), 1); + + server.forbid_connections(); + server.disconnect(); + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} + + server.allow_connections(); + cx.foreground().advance_clock(Duration::from_secs(10)); + while !matches!(status.next().await, Some(Status::Connected { .. })) {} + assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting + + server.forbid_connections(); + server.disconnect(); + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} + + // Clear cached credentials after authentication fails + server.roll_access_token(); + server.allow_connections(); + cx.foreground().advance_clock(Duration::from_secs(10)); + while !matches!(status.next().await, Some(Status::Connected { .. })) {} + assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token + } + + #[gpui::test(iterations = 10)] + async fn test_connection_timeout(deterministic: Arc, cx: &mut TestAppContext) { + deterministic.forbid_parking(); + + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let mut status = client.status(); + + // Time out when client tries to connect. + client.override_authenticate(move |cx| { + cx.foreground().spawn(async move { + Ok(Credentials { + user_id, + access_token: "token".into(), + }) + }) + }); + client.override_establish_connection(|_, cx| { + cx.foreground().spawn(async move { + future::pending::<()>().await; + unreachable!() + }) + }); + let auth_and_connect = cx.spawn({ + let client = client.clone(); + |cx| async move { client.authenticate_and_connect(false, &cx).await } + }); + deterministic.run_until_parked(); + assert!(matches!(status.next().await, Some(Status::Connecting))); + + deterministic.advance_clock(CONNECTION_TIMEOUT); + assert!(matches!( + status.next().await, + Some(Status::ConnectionError { .. }) + )); + auth_and_connect.await.unwrap_err(); + + // Allow the connection to be established. + let server = FakeServer::for_client(user_id, &client, cx).await; + assert!(matches!( + status.next().await, + Some(Status::Connected { .. }) + )); + + // Disconnect client. + server.forbid_connections(); + server.disconnect(); + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} + + // Time out when re-establishing the connection. + server.allow_connections(); + client.override_establish_connection(|_, cx| { + cx.foreground().spawn(async move { + future::pending::<()>().await; + unreachable!() + }) + }); + deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY); + assert!(matches!( + status.next().await, + Some(Status::Reconnecting { .. }) + )); + + deterministic.advance_clock(CONNECTION_TIMEOUT); + assert!(matches!( + status.next().await, + Some(Status::ReconnectionError { .. }) + )); + } + + #[gpui::test(iterations = 10)] + async fn test_authenticating_more_than_once( + cx: &mut TestAppContext, + deterministic: Arc, + ) { + cx.foreground().forbid_parking(); + + let auth_count = Arc::new(Mutex::new(0)); + let dropped_auth_count = Arc::new(Mutex::new(0)); + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + client.override_authenticate({ + let auth_count = auth_count.clone(); + let dropped_auth_count = dropped_auth_count.clone(); + move |cx| { + let auth_count = auth_count.clone(); + let dropped_auth_count = dropped_auth_count.clone(); + cx.foreground().spawn(async move { + *auth_count.lock() += 1; + let _drop = util::defer(move || *dropped_auth_count.lock() += 1); + future::pending::<()>().await; + unreachable!() + }) + } + }); + + let _authenticate = cx.spawn(|cx| { + let client = client.clone(); + async move { client.authenticate_and_connect(false, &cx).await } + }); + deterministic.run_until_parked(); + assert_eq!(*auth_count.lock(), 1); + assert_eq!(*dropped_auth_count.lock(), 0); + + let _authenticate = cx.spawn(|cx| { + let client = client.clone(); + async move { client.authenticate_and_connect(false, &cx).await } + }); + deterministic.run_until_parked(); + assert_eq!(*auth_count.lock(), 2); + assert_eq!(*dropped_auth_count.lock(), 1); + } + + #[test] + fn test_encode_and_decode_worktree_url() { + let url = encode_worktree_url(5, "deadbeef"); + assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string()))); + assert_eq!( + decode_worktree_url(&format!("\n {}\t", url)), + Some((5, "deadbeef".to_string())) + ); + assert_eq!(decode_worktree_url("not://the-right-format"), None); + } + + #[gpui::test] + async fn test_subscribing_to_entity(cx: &mut TestAppContext) { + cx.foreground().forbid_parking(); + + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + + let (done_tx1, mut done_rx1) = smol::channel::unbounded(); + let (done_tx2, mut done_rx2) = smol::channel::unbounded(); + client.add_model_message_handler( + move |model: ModelHandle, _: TypedEnvelope, _, cx| { + match model.read_with(&cx, |model, _| model.id) { + 1 => done_tx1.try_send(()).unwrap(), + 2 => done_tx2.try_send(()).unwrap(), + _ => unreachable!(), + } + async { Ok(()) } + }, + ); + let model1 = cx.add_model(|_| Model { + id: 1, + subscription: None, + }); + let model2 = cx.add_model(|_| Model { + id: 2, + subscription: None, + }); + let model3 = cx.add_model(|_| Model { + id: 3, + subscription: None, + }); + + let _subscription1 = client + .subscribe_to_entity(1) + .unwrap() + .set_model(&model1, &mut cx.to_async()); + let _subscription2 = client + .subscribe_to_entity(2) + .unwrap() + .set_model(&model2, &mut cx.to_async()); + // Ensure dropping a subscription for the same entity type still allows receiving of + // messages for other entity IDs of the same type. + let subscription3 = client + .subscribe_to_entity(3) + .unwrap() + .set_model(&model3, &mut cx.to_async()); + drop(subscription3); + + server.send(proto::JoinProject { project_id: 1 }); + server.send(proto::JoinProject { project_id: 2 }); + done_rx1.next().await.unwrap(); + done_rx2.next().await.unwrap(); + } + + #[gpui::test] + async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) { + cx.foreground().forbid_parking(); + + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + + let model = cx.add_model(|_| Model::default()); + let (done_tx1, _done_rx1) = smol::channel::unbounded(); + let (done_tx2, mut done_rx2) = smol::channel::unbounded(); + let subscription1 = client.add_message_handler( + model.clone(), + move |_, _: TypedEnvelope, _, _| { + done_tx1.try_send(()).unwrap(); + async { Ok(()) } + }, + ); + drop(subscription1); + let _subscription2 = client.add_message_handler( + model.clone(), + move |_, _: TypedEnvelope, _, _| { + done_tx2.try_send(()).unwrap(); + async { Ok(()) } + }, + ); + server.send(proto::Ping {}); + done_rx2.next().await.unwrap(); + } + + #[gpui::test] + async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) { + cx.foreground().forbid_parking(); + + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + + let model = cx.add_model(|_| Model::default()); + let (done_tx, mut done_rx) = smol::channel::unbounded(); + let subscription = client.add_message_handler( + model.clone(), + move |model, _: TypedEnvelope, _, mut cx| { + model.update(&mut cx, |model, _| model.subscription.take()); + done_tx.try_send(()).unwrap(); + async { Ok(()) } + }, + ); + model.update(cx, |model, _| { + model.subscription = Some(subscription); + }); + server.send(proto::Ping {}); + done_rx.next().await.unwrap(); + } + + #[derive(Default)] + struct Model { + id: usize, + subscription: Option, + } + + impl Entity for Model { + type Event = (); + } +} diff --git a/crates/client2/src/telemetry.rs b/crates/client2/src/telemetry.rs new file mode 100644 index 0000000000000000000000000000000000000000..70878bf2e46117c787cc0074f77b30494ae6f544 --- /dev/null +++ b/crates/client2/src/telemetry.rs @@ -0,0 +1,317 @@ +use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL}; +use gpui::{executor::Background, serde_json, AppContext, Task}; +use lazy_static::lazy_static; +use parking_lot::Mutex; +use serde::Serialize; +use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration}; +use sysinfo::{Pid, PidExt, ProcessExt, System, SystemExt}; +use tempfile::NamedTempFile; +use util::http::HttpClient; +use util::{channel::ReleaseChannel, TryFutureExt}; + +pub struct Telemetry { + http_client: Arc, + executor: Arc, + state: Mutex, +} + +#[derive(Default)] +struct TelemetryState { + metrics_id: Option>, // Per logged-in user + installation_id: Option>, // Per app installation (different for dev, preview, and stable) + session_id: Option>, // Per app launch + app_version: Option>, + release_channel: Option<&'static str>, + os_name: &'static str, + os_version: Option>, + architecture: &'static str, + clickhouse_events_queue: Vec, + flush_clickhouse_events_task: Option>, + log_file: Option, + is_staff: Option, +} + +const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events"; + +lazy_static! { + static ref CLICKHOUSE_EVENTS_URL: String = + format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH); +} + +#[derive(Serialize, Debug)] +struct ClickhouseEventRequestBody { + token: &'static str, + installation_id: Option>, + session_id: Option>, + is_staff: Option, + app_version: Option>, + os_name: &'static str, + os_version: Option>, + architecture: &'static str, + release_channel: Option<&'static str>, + events: Vec, +} + +#[derive(Serialize, Debug)] +struct ClickhouseEventWrapper { + signed_in: bool, + #[serde(flatten)] + event: ClickhouseEvent, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "snake_case")] +pub enum AssistantKind { + Panel, + Inline, +} + +#[derive(Serialize, Debug)] +#[serde(tag = "type")] +pub enum ClickhouseEvent { + Editor { + operation: &'static str, + file_extension: Option, + vim_mode: bool, + copilot_enabled: bool, + copilot_enabled_for_language: bool, + }, + Copilot { + suggestion_id: Option, + suggestion_accepted: bool, + file_extension: Option, + }, + Call { + operation: &'static str, + room_id: Option, + channel_id: Option, + }, + Assistant { + conversation_id: Option, + kind: AssistantKind, + model: &'static str, + }, + Cpu { + usage_as_percentage: f32, + core_count: u32, + }, + Memory { + memory_in_bytes: u64, + virtual_memory_in_bytes: u64, + }, +} + +#[cfg(debug_assertions)] +const MAX_QUEUE_LEN: usize = 1; + +#[cfg(not(debug_assertions))] +const MAX_QUEUE_LEN: usize = 10; + +#[cfg(debug_assertions)] +const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1); + +#[cfg(not(debug_assertions))] +const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); + +impl Telemetry { + pub fn new(client: Arc, cx: &AppContext) -> Arc { + let platform = cx.platform(); + let release_channel = if cx.has_global::() { + Some(cx.global::().display_name()) + } else { + None + }; + // TODO: Replace all hardware stuff with nested SystemSpecs json + let this = Arc::new(Self { + http_client: client, + executor: cx.background().clone(), + state: Mutex::new(TelemetryState { + os_name: platform.os_name().into(), + os_version: platform.os_version().ok().map(|v| v.to_string().into()), + architecture: env::consts::ARCH, + app_version: platform.app_version().ok().map(|v| v.to_string().into()), + release_channel, + installation_id: None, + metrics_id: None, + session_id: None, + clickhouse_events_queue: Default::default(), + flush_clickhouse_events_task: Default::default(), + log_file: None, + is_staff: None, + }), + }); + + this + } + + pub fn log_file_path(&self) -> Option { + Some(self.state.lock().log_file.as_ref()?.path().to_path_buf()) + } + + pub fn start( + self: &Arc, + installation_id: Option, + session_id: String, + cx: &mut AppContext, + ) { + let mut state = self.state.lock(); + state.installation_id = installation_id.map(|id| id.into()); + state.session_id = Some(session_id.into()); + let has_clickhouse_events = !state.clickhouse_events_queue.is_empty(); + drop(state); + + if has_clickhouse_events { + self.flush_clickhouse_events(); + } + + let this = self.clone(); + cx.spawn(|mut cx| async move { + let mut system = System::new_all(); + system.refresh_all(); + + loop { + // Waiting some amount of time before the first query is important to get a reasonable value + // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage + const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(60); + smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await; + + system.refresh_memory(); + system.refresh_processes(); + + let current_process = Pid::from_u32(std::process::id()); + let Some(process) = system.processes().get(¤t_process) else { + let process = current_process; + log::error!("Failed to find own process {process:?} in system process table"); + // TODO: Fire an error telemetry event + return; + }; + + let memory_event = ClickhouseEvent::Memory { + memory_in_bytes: process.memory(), + virtual_memory_in_bytes: process.virtual_memory(), + }; + + let cpu_event = ClickhouseEvent::Cpu { + usage_as_percentage: process.cpu_usage(), + core_count: system.cpus().len() as u32, + }; + + let telemetry_settings = cx.update(|cx| *settings::get::(cx)); + + this.report_clickhouse_event(memory_event, telemetry_settings); + this.report_clickhouse_event(cpu_event, telemetry_settings); + } + }) + .detach(); + } + + pub fn set_authenticated_user_info( + self: &Arc, + metrics_id: Option, + is_staff: bool, + cx: &AppContext, + ) { + if !settings::get::(cx).metrics { + return; + } + + let mut state = self.state.lock(); + let metrics_id: Option> = metrics_id.map(|id| id.into()); + state.metrics_id = metrics_id.clone(); + state.is_staff = Some(is_staff); + drop(state); + } + + pub fn report_clickhouse_event( + self: &Arc, + event: ClickhouseEvent, + telemetry_settings: TelemetrySettings, + ) { + if !telemetry_settings.metrics { + return; + } + + let mut state = self.state.lock(); + let signed_in = state.metrics_id.is_some(); + state + .clickhouse_events_queue + .push(ClickhouseEventWrapper { signed_in, event }); + + if state.installation_id.is_some() { + if state.clickhouse_events_queue.len() >= MAX_QUEUE_LEN { + drop(state); + self.flush_clickhouse_events(); + } else { + let this = self.clone(); + let executor = self.executor.clone(); + state.flush_clickhouse_events_task = Some(self.executor.spawn(async move { + executor.timer(DEBOUNCE_INTERVAL).await; + this.flush_clickhouse_events(); + })); + } + } + } + + pub fn metrics_id(self: &Arc) -> Option> { + self.state.lock().metrics_id.clone() + } + + pub fn installation_id(self: &Arc) -> Option> { + self.state.lock().installation_id.clone() + } + + pub fn is_staff(self: &Arc) -> Option { + self.state.lock().is_staff + } + + fn flush_clickhouse_events(self: &Arc) { + let mut state = self.state.lock(); + let mut events = mem::take(&mut state.clickhouse_events_queue); + state.flush_clickhouse_events_task.take(); + drop(state); + + let this = self.clone(); + self.executor + .spawn( + async move { + let mut json_bytes = Vec::new(); + + if let Some(file) = &mut this.state.lock().log_file { + let file = file.as_file_mut(); + for event in &mut events { + json_bytes.clear(); + serde_json::to_writer(&mut json_bytes, event)?; + file.write_all(&json_bytes)?; + file.write(b"\n")?; + } + } + + { + let state = this.state.lock(); + let request_body = ClickhouseEventRequestBody { + token: ZED_SECRET_CLIENT_TOKEN, + installation_id: state.installation_id.clone(), + session_id: state.session_id.clone(), + is_staff: state.is_staff.clone(), + app_version: state.app_version.clone(), + os_name: state.os_name, + os_version: state.os_version.clone(), + architecture: state.architecture, + + release_channel: state.release_channel, + events, + }; + json_bytes.clear(); + serde_json::to_writer(&mut json_bytes, &request_body)?; + } + + this.http_client + .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into()) + .await?; + anyhow::Ok(()) + } + .log_err(), + ) + .detach(); + } +} diff --git a/crates/client2/src/test.rs b/crates/client2/src/test.rs new file mode 100644 index 0000000000000000000000000000000000000000..38cd12f21cdf74ed895ff9d55ae1995e2efcfafa --- /dev/null +++ b/crates/client2/src/test.rs @@ -0,0 +1,215 @@ +use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore}; +use anyhow::{anyhow, Result}; +use futures::{stream::BoxStream, StreamExt}; +use gpui::{executor, ModelHandle, TestAppContext}; +use parking_lot::Mutex; +use rpc::{ + proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse}, + ConnectionId, Peer, Receipt, TypedEnvelope, +}; +use std::{rc::Rc, sync::Arc}; +use util::http::FakeHttpClient; + +pub struct FakeServer { + peer: Arc, + state: Arc>, + user_id: u64, + executor: Rc, +} + +#[derive(Default)] +struct FakeServerState { + incoming: Option>>, + connection_id: Option, + forbid_connections: bool, + auth_count: usize, + access_token: usize, +} + +impl FakeServer { + pub async fn for_client( + client_user_id: u64, + client: &Arc, + cx: &TestAppContext, + ) -> Self { + let server = Self { + peer: Peer::new(0), + state: Default::default(), + user_id: client_user_id, + executor: cx.foreground(), + }; + + client + .override_authenticate({ + let state = Arc::downgrade(&server.state); + move |cx| { + let state = state.clone(); + cx.spawn(move |_| async move { + let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; + let mut state = state.lock(); + state.auth_count += 1; + let access_token = state.access_token.to_string(); + Ok(Credentials { + user_id: client_user_id, + access_token, + }) + }) + } + }) + .override_establish_connection({ + let peer = Arc::downgrade(&server.peer); + let state = Arc::downgrade(&server.state); + move |credentials, cx| { + let peer = peer.clone(); + let state = state.clone(); + let credentials = credentials.clone(); + cx.spawn(move |cx| async move { + let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; + let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?; + if state.lock().forbid_connections { + Err(EstablishConnectionError::Other(anyhow!( + "server is forbidding connections" + )))? + } + + assert_eq!(credentials.user_id, client_user_id); + + if credentials.access_token != state.lock().access_token.to_string() { + Err(EstablishConnectionError::Unauthorized)? + } + + let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); + let (connection_id, io, incoming) = + peer.add_test_connection(server_conn, cx.background()); + cx.background().spawn(io).detach(); + { + let mut state = state.lock(); + state.connection_id = Some(connection_id); + state.incoming = Some(incoming); + } + peer.send( + connection_id, + proto::Hello { + peer_id: Some(connection_id.into()), + }, + ) + .unwrap(); + + Ok(client_conn) + }) + } + }); + + client + .authenticate_and_connect(false, &cx.to_async()) + .await + .unwrap(); + + server + } + + pub fn disconnect(&self) { + if self.state.lock().connection_id.is_some() { + self.peer.disconnect(self.connection_id()); + let mut state = self.state.lock(); + state.connection_id.take(); + state.incoming.take(); + } + } + + pub fn auth_count(&self) -> usize { + self.state.lock().auth_count + } + + pub fn roll_access_token(&self) { + self.state.lock().access_token += 1; + } + + pub fn forbid_connections(&self) { + self.state.lock().forbid_connections = true; + } + + pub fn allow_connections(&self) { + self.state.lock().forbid_connections = false; + } + + pub fn send(&self, message: T) { + self.peer.send(self.connection_id(), message).unwrap(); + } + + #[allow(clippy::await_holding_lock)] + pub async fn receive(&self) -> Result> { + self.executor.start_waiting(); + + loop { + let message = self + .state + .lock() + .incoming + .as_mut() + .expect("not connected") + .next() + .await + .ok_or_else(|| anyhow!("other half hung up"))?; + self.executor.finish_waiting(); + let type_name = message.payload_type_name(); + let message = message.into_any(); + + if message.is::>() { + return Ok(*message.downcast().unwrap()); + } + + if message.is::>() { + self.respond( + message + .downcast::>() + .unwrap() + .receipt(), + GetPrivateUserInfoResponse { + metrics_id: "the-metrics-id".into(), + staff: false, + flags: Default::default(), + }, + ); + continue; + } + + panic!( + "fake server received unexpected message type: {:?}", + type_name + ); + } + } + + pub fn respond(&self, receipt: Receipt, response: T::Response) { + self.peer.respond(receipt, response).unwrap() + } + + fn connection_id(&self) -> ConnectionId { + self.state.lock().connection_id.expect("not connected") + } + + pub async fn build_user_store( + &self, + client: Arc, + cx: &mut TestAppContext, + ) -> ModelHandle { + let http_client = FakeHttpClient::with_404_response(); + let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); + assert_eq!( + self.receive::() + .await + .unwrap() + .payload + .user_ids, + &[self.user_id] + ); + user_store + } +} + +impl Drop for FakeServer { + fn drop(&mut self) { + self.disconnect(); + } +} diff --git a/crates/client2/src/user.rs b/crates/client2/src/user.rs new file mode 100644 index 0000000000000000000000000000000000000000..6aa41708e3ae3e3c3504ab82791278c8a1837c0a --- /dev/null +++ b/crates/client2/src/user.rs @@ -0,0 +1,737 @@ +use super::{proto, Client, Status, TypedEnvelope}; +use anyhow::{anyhow, Context, Result}; +use collections::{hash_map::Entry, HashMap, HashSet}; +use feature_flags::FeatureFlagAppExt; +use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt}; +use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task}; +use postage::{sink::Sink, watch}; +use rpc::proto::{RequestMessage, UsersResponse}; +use std::sync::{Arc, Weak}; +use text::ReplicaId; +use util::http::HttpClient; +use util::TryFutureExt as _; + +pub type UserId = u64; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ParticipantIndex(pub u32); + +#[derive(Default, Debug)] +pub struct User { + pub id: UserId, + pub github_login: String, + pub avatar: Option>, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Collaborator { + pub peer_id: proto::PeerId, + pub replica_id: ReplicaId, + pub user_id: UserId, +} + +impl PartialOrd for User { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for User { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.github_login.cmp(&other.github_login) + } +} + +impl PartialEq for User { + fn eq(&self, other: &Self) -> bool { + self.id == other.id && self.github_login == other.github_login + } +} + +impl Eq for User {} + +#[derive(Debug, PartialEq)] +pub struct Contact { + pub user: Arc, + pub online: bool, + pub busy: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ContactRequestStatus { + None, + RequestSent, + RequestReceived, + RequestAccepted, +} + +pub struct UserStore { + users: HashMap>, + participant_indices: HashMap, + update_contacts_tx: mpsc::UnboundedSender, + current_user: watch::Receiver>>, + contacts: Vec>, + incoming_contact_requests: Vec>, + outgoing_contact_requests: Vec>, + pending_contact_requests: HashMap, + invite_info: Option, + client: Weak, + http: Arc, + _maintain_contacts: Task<()>, + _maintain_current_user: Task<()>, +} + +#[derive(Clone)] +pub struct InviteInfo { + pub count: u32, + pub url: Arc, +} + +pub enum Event { + Contact { + user: Arc, + kind: ContactEventKind, + }, + ShowContacts, + ParticipantIndicesChanged, +} + +#[derive(Clone, Copy)] +pub enum ContactEventKind { + Requested, + Accepted, + Cancelled, +} + +impl Entity for UserStore { + type Event = Event; +} + +enum UpdateContacts { + Update(proto::UpdateContacts), + Wait(postage::barrier::Sender), + Clear(postage::barrier::Sender), +} + +impl UserStore { + pub fn new( + client: Arc, + http: Arc, + cx: &mut ModelContext, + ) -> Self { + let (mut current_user_tx, current_user_rx) = watch::channel(); + let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded(); + let rpc_subscriptions = vec![ + client.add_message_handler(cx.handle(), Self::handle_update_contacts), + client.add_message_handler(cx.handle(), Self::handle_update_invite_info), + client.add_message_handler(cx.handle(), Self::handle_show_contacts), + ]; + Self { + users: Default::default(), + current_user: current_user_rx, + contacts: Default::default(), + incoming_contact_requests: Default::default(), + participant_indices: Default::default(), + outgoing_contact_requests: Default::default(), + invite_info: None, + client: Arc::downgrade(&client), + update_contacts_tx, + http, + _maintain_contacts: cx.spawn_weak(|this, mut cx| async move { + let _subscriptions = rpc_subscriptions; + while let Some(message) = update_contacts_rx.next().await { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| this.update_contacts(message, cx)) + .log_err() + .await; + } + } + }), + _maintain_current_user: cx.spawn_weak(|this, mut cx| async move { + let mut status = client.status(); + while let Some(status) = status.next().await { + match status { + Status::Connected { .. } => { + if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) { + let fetch_user = this + .update(&mut cx, |this, cx| this.get_user(user_id, cx)) + .log_err(); + let fetch_metrics_id = + client.request(proto::GetPrivateUserInfo {}).log_err(); + let (user, info) = futures::join!(fetch_user, fetch_metrics_id); + + if let Some(info) = info { + cx.update(|cx| { + cx.update_flags(info.staff, info.flags); + client.telemetry.set_authenticated_user_info( + Some(info.metrics_id.clone()), + info.staff, + cx, + ) + }); + } else { + cx.read(|cx| { + client + .telemetry + .set_authenticated_user_info(None, false, cx) + }); + } + + current_user_tx.send(user).await.ok(); + + this.update(&mut cx, |_, cx| { + cx.notify(); + }); + } + } + Status::SignedOut => { + current_user_tx.send(None).await.ok(); + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + cx.notify(); + this.clear_contacts() + }) + .await; + } + } + Status::ConnectionLost => { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + cx.notify(); + this.clear_contacts() + }) + .await; + } + } + _ => {} + } + } + }), + pending_contact_requests: Default::default(), + } + } + + #[cfg(feature = "test-support")] + pub fn clear_cache(&mut self) { + self.users.clear(); + } + + async fn handle_update_invite_info( + this: ModelHandle, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + this.invite_info = Some(InviteInfo { + url: Arc::from(message.payload.url), + count: message.payload.count, + }); + cx.notify(); + }); + Ok(()) + } + + async fn handle_show_contacts( + this: ModelHandle, + _: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts)); + Ok(()) + } + + pub fn invite_info(&self) -> Option<&InviteInfo> { + self.invite_info.as_ref() + } + + async fn handle_update_contacts( + this: ModelHandle, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + this.update_contacts_tx + .unbounded_send(UpdateContacts::Update(message.payload)) + .unwrap(); + }); + Ok(()) + } + + fn update_contacts( + &mut self, + message: UpdateContacts, + cx: &mut ModelContext, + ) -> Task> { + match message { + UpdateContacts::Wait(barrier) => { + drop(barrier); + Task::ready(Ok(())) + } + UpdateContacts::Clear(barrier) => { + self.contacts.clear(); + self.incoming_contact_requests.clear(); + self.outgoing_contact_requests.clear(); + drop(barrier); + Task::ready(Ok(())) + } + UpdateContacts::Update(message) => { + let mut user_ids = HashSet::default(); + for contact in &message.contacts { + user_ids.insert(contact.user_id); + } + user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id)); + user_ids.extend(message.outgoing_requests.iter()); + + let load_users = self.get_users(user_ids.into_iter().collect(), cx); + cx.spawn(|this, mut cx| async move { + load_users.await?; + + // Users are fetched in parallel above and cached in call to get_users + // No need to paralellize here + let mut updated_contacts = Vec::new(); + for contact in message.contacts { + let should_notify = contact.should_notify; + updated_contacts.push(( + Arc::new(Contact::from_proto(contact, &this, &mut cx).await?), + should_notify, + )); + } + + let mut incoming_requests = Vec::new(); + for request in message.incoming_requests { + incoming_requests.push({ + let user = this + .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx)) + .await?; + (user, request.should_notify) + }); + } + + let mut outgoing_requests = Vec::new(); + for requested_user_id in message.outgoing_requests { + outgoing_requests.push( + this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx)) + .await?, + ); + } + + let removed_contacts = + HashSet::::from_iter(message.remove_contacts.iter().copied()); + let removed_incoming_requests = + HashSet::::from_iter(message.remove_incoming_requests.iter().copied()); + let removed_outgoing_requests = + HashSet::::from_iter(message.remove_outgoing_requests.iter().copied()); + + this.update(&mut cx, |this, cx| { + // Remove contacts + this.contacts + .retain(|contact| !removed_contacts.contains(&contact.user.id)); + // Update existing contacts and insert new ones + for (updated_contact, should_notify) in updated_contacts { + if should_notify { + cx.emit(Event::Contact { + user: updated_contact.user.clone(), + kind: ContactEventKind::Accepted, + }); + } + match this.contacts.binary_search_by_key( + &&updated_contact.user.github_login, + |contact| &contact.user.github_login, + ) { + Ok(ix) => this.contacts[ix] = updated_contact, + Err(ix) => this.contacts.insert(ix, updated_contact), + } + } + + // Remove incoming contact requests + this.incoming_contact_requests.retain(|user| { + if removed_incoming_requests.contains(&user.id) { + cx.emit(Event::Contact { + user: user.clone(), + kind: ContactEventKind::Cancelled, + }); + false + } else { + true + } + }); + // Update existing incoming requests and insert new ones + for (user, should_notify) in incoming_requests { + if should_notify { + cx.emit(Event::Contact { + user: user.clone(), + kind: ContactEventKind::Requested, + }); + } + + match this + .incoming_contact_requests + .binary_search_by_key(&&user.github_login, |contact| { + &contact.github_login + }) { + Ok(ix) => this.incoming_contact_requests[ix] = user, + Err(ix) => this.incoming_contact_requests.insert(ix, user), + } + } + + // Remove outgoing contact requests + this.outgoing_contact_requests + .retain(|user| !removed_outgoing_requests.contains(&user.id)); + // Update existing incoming requests and insert new ones + for request in outgoing_requests { + match this + .outgoing_contact_requests + .binary_search_by_key(&&request.github_login, |contact| { + &contact.github_login + }) { + Ok(ix) => this.outgoing_contact_requests[ix] = request, + Err(ix) => this.outgoing_contact_requests.insert(ix, request), + } + } + + cx.notify(); + }); + + Ok(()) + }) + } + } + } + + pub fn contacts(&self) -> &[Arc] { + &self.contacts + } + + pub fn has_contact(&self, user: &Arc) -> bool { + self.contacts + .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login) + .is_ok() + } + + pub fn incoming_contact_requests(&self) -> &[Arc] { + &self.incoming_contact_requests + } + + pub fn outgoing_contact_requests(&self) -> &[Arc] { + &self.outgoing_contact_requests + } + + pub fn is_contact_request_pending(&self, user: &User) -> bool { + self.pending_contact_requests.contains_key(&user.id) + } + + pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus { + if self + .contacts + .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login) + .is_ok() + { + ContactRequestStatus::RequestAccepted + } else if self + .outgoing_contact_requests + .binary_search_by_key(&&user.github_login, |user| &user.github_login) + .is_ok() + { + ContactRequestStatus::RequestSent + } else if self + .incoming_contact_requests + .binary_search_by_key(&&user.github_login, |user| &user.github_login) + .is_ok() + { + ContactRequestStatus::RequestReceived + } else { + ContactRequestStatus::None + } + } + + pub fn request_contact( + &mut self, + responder_id: u64, + cx: &mut ModelContext, + ) -> Task> { + self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx) + } + + pub fn remove_contact( + &mut self, + user_id: u64, + cx: &mut ModelContext, + ) -> Task> { + self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx) + } + + pub fn respond_to_contact_request( + &mut self, + requester_id: u64, + accept: bool, + cx: &mut ModelContext, + ) -> Task> { + self.perform_contact_request( + requester_id, + proto::RespondToContactRequest { + requester_id, + response: if accept { + proto::ContactRequestResponse::Accept + } else { + proto::ContactRequestResponse::Decline + } as i32, + }, + cx, + ) + } + + pub fn dismiss_contact_request( + &mut self, + requester_id: u64, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.upgrade(); + cx.spawn_weak(|_, _| async move { + client + .ok_or_else(|| anyhow!("can't upgrade client reference"))? + .request(proto::RespondToContactRequest { + requester_id, + response: proto::ContactRequestResponse::Dismiss as i32, + }) + .await?; + Ok(()) + }) + } + + fn perform_contact_request( + &mut self, + user_id: u64, + request: T, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.upgrade(); + *self.pending_contact_requests.entry(user_id).or_insert(0) += 1; + cx.notify(); + + cx.spawn(|this, mut cx| async move { + let response = client + .ok_or_else(|| anyhow!("can't upgrade client reference"))? + .request(request) + .await; + this.update(&mut cx, |this, cx| { + if let Entry::Occupied(mut request_count) = + this.pending_contact_requests.entry(user_id) + { + *request_count.get_mut() -= 1; + if *request_count.get() == 0 { + request_count.remove(); + } + } + cx.notify(); + }); + response?; + Ok(()) + }) + } + + pub fn clear_contacts(&mut self) -> impl Future { + let (tx, mut rx) = postage::barrier::channel(); + self.update_contacts_tx + .unbounded_send(UpdateContacts::Clear(tx)) + .unwrap(); + async move { + rx.next().await; + } + } + + pub fn contact_updates_done(&mut self) -> impl Future { + let (tx, mut rx) = postage::barrier::channel(); + self.update_contacts_tx + .unbounded_send(UpdateContacts::Wait(tx)) + .unwrap(); + async move { + rx.next().await; + } + } + + pub fn get_users( + &mut self, + user_ids: Vec, + cx: &mut ModelContext, + ) -> Task>>> { + let mut user_ids_to_fetch = user_ids.clone(); + user_ids_to_fetch.retain(|id| !self.users.contains_key(id)); + + cx.spawn(|this, mut cx| async move { + if !user_ids_to_fetch.is_empty() { + this.update(&mut cx, |this, cx| { + this.load_users( + proto::GetUsers { + user_ids: user_ids_to_fetch, + }, + cx, + ) + }) + .await?; + } + + this.read_with(&cx, |this, _| { + user_ids + .iter() + .map(|user_id| { + this.users + .get(user_id) + .cloned() + .ok_or_else(|| anyhow!("user {} not found", user_id)) + }) + .collect() + }) + }) + } + + pub fn fuzzy_search_users( + &mut self, + query: String, + cx: &mut ModelContext, + ) -> Task>>> { + self.load_users(proto::FuzzySearchUsers { query }, cx) + } + + pub fn get_cached_user(&self, user_id: u64) -> Option> { + self.users.get(&user_id).cloned() + } + + pub fn get_user( + &mut self, + user_id: u64, + cx: &mut ModelContext, + ) -> Task>> { + if let Some(user) = self.users.get(&user_id).cloned() { + return cx.foreground().spawn(async move { Ok(user) }); + } + + let load_users = self.get_users(vec![user_id], cx); + cx.spawn(|this, mut cx| async move { + load_users.await?; + this.update(&mut cx, |this, _| { + this.users + .get(&user_id) + .cloned() + .ok_or_else(|| anyhow!("server responded with no users")) + }) + }) + } + + pub fn current_user(&self) -> Option> { + self.current_user.borrow().clone() + } + + pub fn watch_current_user(&self) -> watch::Receiver>> { + self.current_user.clone() + } + + fn load_users( + &mut self, + request: impl RequestMessage, + cx: &mut ModelContext, + ) -> Task>>> { + let client = self.client.clone(); + let http = self.http.clone(); + cx.spawn_weak(|this, mut cx| async move { + if let Some(rpc) = client.upgrade() { + let response = rpc.request(request).await.context("error loading users")?; + let users = future::join_all( + response + .users + .into_iter() + .map(|user| User::new(user, http.as_ref())), + ) + .await; + + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, _| { + for user in &users { + this.users.insert(user.id, user.clone()); + } + }); + } + Ok(users) + } else { + Ok(Vec::new()) + } + }) + } + + pub fn set_participant_indices( + &mut self, + participant_indices: HashMap, + cx: &mut ModelContext, + ) { + if participant_indices != self.participant_indices { + self.participant_indices = participant_indices; + cx.emit(Event::ParticipantIndicesChanged); + } + } + + pub fn participant_indices(&self) -> &HashMap { + &self.participant_indices + } +} + +impl User { + async fn new(message: proto::User, http: &dyn HttpClient) -> Arc { + Arc::new(User { + id: message.id, + github_login: message.github_login, + avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await, + }) + } +} + +impl Contact { + async fn from_proto( + contact: proto::Contact, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> Result { + let user = user_store + .update(cx, |user_store, cx| { + user_store.get_user(contact.user_id, cx) + }) + .await?; + Ok(Self { + user, + online: contact.online, + busy: contact.busy, + }) + } +} + +impl Collaborator { + pub fn from_proto(message: proto::Collaborator) -> Result { + Ok(Self { + peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?, + replica_id: message.replica_id as ReplicaId, + user_id: message.user_id as UserId, + }) + } +} + +async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result> { + let mut response = http + .get(url, Default::default(), true) + .await + .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?; + + if !response.status().is_success() { + return Err(anyhow!("avatar request failed {:?}", response.status())); + } + + let mut body = Vec::new(); + response + .body_mut() + .read_to_end(&mut body) + .await + .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?; + let format = image::guess_format(&body)?; + let image = image::load_from_memory_with_format(&body, format)?.into_bgra8(); + Ok(ImageData::new(image)) +} diff --git a/crates/gpui2/src/app.rs b/crates/gpui2/src/app.rs index 16e22f6cefd96c155def13eba971a28a5c4123c5..88bb3b9eb563d416a144a622dd470fa37a1e3285 100644 --- a/crates/gpui2/src/app.rs +++ b/crates/gpui2/src/app.rs @@ -11,8 +11,8 @@ use smallvec::SmallVec; use crate::{ current_platform, image_cache::ImageCache, Action, AssetSource, Context, DisplayId, Executor, FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, LayoutId, MainThread, MainThreadOnly, - Platform, SharedString, SubscriberSet, SvgRenderer, Task, TextStyle, TextStyleRefinement, - TextSystem, View, Window, WindowContext, WindowHandle, WindowId, + Platform, SemanticVersion, SharedString, SubscriberSet, SvgRenderer, Task, TextStyle, + TextStyleRefinement, TextSystem, View, Window, WindowContext, WindowHandle, WindowId, }; use anyhow::{anyhow, Result}; use collections::{HashMap, HashSet, VecDeque}; @@ -125,6 +125,18 @@ impl App { self } + pub fn app_version(&self) -> Result { + self.0.lock().platform.borrow_on_main_thread().app_version() + } + + pub fn os_name(&self) -> &'static str { + self.0.lock().platform.borrow_on_main_thread().os_name() + } + + pub fn os_version(&self) -> Result { + self.0.lock().platform.borrow_on_main_thread().os_version() + } + pub fn executor(&self) -> Executor { self.0.lock().executor.clone() } @@ -348,7 +360,7 @@ impl AppContext { .ok(); } - pub fn apply_refresh(&mut self) { + fn apply_refresh(&mut self) { for window in self.windows.values_mut() { if let Some(window) = window.as_mut() { window.dirty = true; diff --git a/crates/zed2/Cargo.toml b/crates/zed2/Cargo.toml index fc40c5f950459e67f3efe2e333af4cd48779606b..dc9204e241aeab8b3d008b561fdec1362cf1515b 100644 --- a/crates/zed2/Cargo.toml +++ b/crates/zed2/Cargo.toml @@ -27,7 +27,7 @@ collections = { path = "../collections" } # command_palette = { path = "../command_palette" } # component_test = { path = "../component_test" } # context_menu = { path = "../context_menu" } -# client = { path = "../client" } +client2 = { path = "../client2" } # clock = { path = "../clock" } # copilot = { path = "../copilot" } # copilot_button = { path = "../copilot_button" } diff --git a/crates/zed2/src/main.rs b/crates/zed2/src/main.rs index c0c1d7d788aabda38edffc38672b367429a9224d..bfec1bce29ab1a1f1ae6813fa37128370d432036 100644 --- a/crates/zed2/src/main.rs +++ b/crates/zed2/src/main.rs @@ -3,13 +3,15 @@ use crate::open_listener::{OpenListener, OpenRequest}; use anyhow::{anyhow, Context, Result}; +use backtrace::Backtrace; use cli::{ ipc::{self, IpcSender}, CliRequest, CliResponse, IpcHandshake, FORCE_CLI_MODE_ENV_VAR_NAME, }; use fs::RealFs; use futures::{channel::mpsc, SinkExt, StreamExt}; -use gpui2::{App, AssetSource, AsyncAppContext, Task}; +use gpui2::{App, AppContext, AssetSource, AsyncAppContext, SemanticVersion, Task}; +use isahc::{prelude::Configurable, Request}; use log::LevelFilter; use parking_lot::Mutex; @@ -19,17 +21,21 @@ use simplelog::ConfigBuilder; use smol::process::Command; use std::{ env, + ffi::OsStr, fs::OpenOptions, - io::IsTerminal, + io::{IsTerminal, Write}, + panic, path::Path, sync::{ atomic::{AtomicU32, Ordering}, Arc, }, thread, + time::{SystemTime, UNIX_EPOCH}, }; use util::{ - channel::{parse_zed_link, RELEASE_CHANNEL}, + channel::{parse_zed_link, ReleaseChannel, RELEASE_CHANNEL}, + http::HttpClient, paths, ResultExt, }; use zed2::{ensure_only_instance, AppState, Assets, IsOnlyInstance}; @@ -75,8 +81,8 @@ fn main() { let (listener, mut open_rx) = OpenListener::new(); let listener = Arc::new(listener); - let callback_listener = listener.clone(); - app.on_open_urls(move |urls, _| callback_listener.open_urls(urls)); + let open_listener = listener.clone(); + app.on_open_urls(move |urls, _| open_listener.open_urls(urls)); app.on_reopen(move |_cx| { // todo!("workspace") // if cx.has_global::>() { @@ -394,192 +400,184 @@ struct PanicRequest { token: String, } -static _PANIC_COUNT: AtomicU32 = AtomicU32::new(0); +static PANIC_COUNT: AtomicU32 = AtomicU32::new(0); -// fn init_panic_hook(app: &App, installation_id: Option, session_id: String) { -// let is_pty = stdout_is_a_pty(); -// let platform = app.platform(); +fn init_panic_hook(app: &App, installation_id: Option, session_id: String) { + let is_pty = stdout_is_a_pty(); + let app_version = app.app_version().ok(); + let os_name = app.os_name(); + let os_version = app.os_version().ok(); -// panic::set_hook(Box::new(move |info| { -// let prior_panic_count = PANIC_COUNT.fetch_add(1, Ordering::SeqCst); -// if prior_panic_count > 0 { -// // Give the panic-ing thread time to write the panic file -// loop { -// std::thread::yield_now(); -// } -// } + panic::set_hook(Box::new(move |info| { + let prior_panic_count = PANIC_COUNT.fetch_add(1, Ordering::SeqCst); + if prior_panic_count > 0 { + // Give the panic-ing thread time to write the panic file + loop { + std::thread::yield_now(); + } + } -// let thread = thread::current(); -// let thread_name = thread.name().unwrap_or(""); + let thread = thread::current(); + let thread_name = thread.name().unwrap_or(""); + + let payload = info + .payload() + .downcast_ref::<&str>() + .map(|s| s.to_string()) + .or_else(|| info.payload().downcast_ref::().map(|s| s.clone())) + .unwrap_or_else(|| "Box".to_string()); + + if *util::channel::RELEASE_CHANNEL == ReleaseChannel::Dev { + let location = info.location().unwrap(); + let backtrace = Backtrace::new(); + eprintln!( + "Thread {:?} panicked with {:?} at {}:{}:{}\n{:?}", + thread_name, + payload, + location.file(), + location.line(), + location.column(), + backtrace, + ); + std::process::exit(-1); + } -// let payload = info -// .payload() -// .downcast_ref::<&str>() -// .map(|s| s.to_string()) -// .or_else(|| info.payload().downcast_ref::().map(|s| s.clone())) -// .unwrap_or_else(|| "Box".to_string()); + let app_version = client::ZED_APP_VERSION + .or(app_version) + .map_or("dev".to_string(), |v| v.to_string()); + + let backtrace = Backtrace::new(); + let mut backtrace = backtrace + .frames() + .iter() + .filter_map(|frame| Some(format!("{:#}", frame.symbols().first()?.name()?))) + .collect::>(); + + // Strip out leading stack frames for rust panic-handling. + if let Some(ix) = backtrace + .iter() + .position(|name| name == "rust_begin_unwind") + { + backtrace.drain(0..=ix); + } -// if *util::channel::RELEASE_CHANNEL == ReleaseChannel::Dev { -// let location = info.location().unwrap(); -// let backtrace = Backtrace::new(); -// eprintln!( -// "Thread {:?} panicked with {:?} at {}:{}:{}\n{:?}", -// thread_name, -// payload, -// location.file(), -// location.line(), -// location.column(), -// backtrace, -// ); -// std::process::exit(-1); -// } + let panic_data = Panic { + thread: thread_name.into(), + payload: payload.into(), + location_data: info.location().map(|location| LocationData { + file: location.file().into(), + line: location.line(), + }), + app_version: app_version.clone(), + release_channel: RELEASE_CHANNEL.display_name().into(), + os_name: os_name.into(), + os_version: os_version.as_ref().map(SemanticVersion::to_string), + architecture: env::consts::ARCH.into(), + panicked_on: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + backtrace, + installation_id: installation_id.clone(), + session_id: session_id.clone(), + }; + + if let Some(panic_data_json) = serde_json::to_string_pretty(&panic_data).log_err() { + log::error!("{}", panic_data_json); + } -// let app_version = ZED_APP_VERSION -// .or_else(|| platform.app_version().ok()) -// .map_or("dev".to_string(), |v| v.to_string()); - -// let backtrace = Backtrace::new(); -// let mut backtrace = backtrace -// .frames() -// .iter() -// .filter_map(|frame| Some(format!("{:#}", frame.symbols().first()?.name()?))) -// .collect::>(); - -// // Strip out leading stack frames for rust panic-handling. -// if let Some(ix) = backtrace -// .iter() -// .position(|name| name == "rust_begin_unwind") -// { -// backtrace.drain(0..=ix); -// } + if !is_pty { + if let Some(panic_data_json) = serde_json::to_string(&panic_data).log_err() { + let timestamp = chrono::Utc::now().format("%Y_%m_%d %H_%M_%S").to_string(); + let panic_file_path = paths::LOGS_DIR.join(format!("zed-{}.panic", timestamp)); + let panic_file = std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(&panic_file_path) + .log_err(); + if let Some(mut panic_file) = panic_file { + writeln!(&mut panic_file, "{}", panic_data_json).log_err(); + panic_file.flush().log_err(); + } + } + } -// let panic_data = Panic { -// thread: thread_name.into(), -// payload: payload.into(), -// location_data: info.location().map(|location| LocationData { -// file: location.file().into(), -// line: location.line(), -// }), -// app_version: app_version.clone(), -// release_channel: RELEASE_CHANNEL.display_name().into(), -// os_name: platform.os_name().into(), -// os_version: platform -// .os_version() -// .ok() -// .map(|os_version| os_version.to_string()), -// architecture: env::consts::ARCH.into(), -// panicked_on: SystemTime::now() -// .duration_since(UNIX_EPOCH) -// .unwrap() -// .as_millis(), -// backtrace, -// installation_id: installation_id.clone(), -// session_id: session_id.clone(), -// }; - -// if let Some(panic_data_json) = serde_json::to_string_pretty(&panic_data).log_err() { -// log::error!("{}", panic_data_json); -// } + std::process::abort(); + })); +} -// if !is_pty { -// if let Some(panic_data_json) = serde_json::to_string(&panic_data).log_err() { -// let timestamp = chrono::Utc::now().format("%Y_%m_%d %H_%M_%S").to_string(); -// let panic_file_path = paths::LOGS_DIR.join(format!("zed-{}.panic", timestamp)); -// let panic_file = std::fs::OpenOptions::new() -// .append(true) -// .create(true) -// .open(&panic_file_path) -// .log_err(); -// if let Some(mut panic_file) = panic_file { -// writeln!(&mut panic_file, "{}", panic_data_json).log_err(); -// panic_file.flush().log_err(); -// } -// } -// } +fn upload_previous_panics(http: Arc, cx: &mut AppContext) { + let telemetry_settings = *settings2::get::(cx); -// std::process::abort(); -// })); -// } + cx.executor() + .spawn(async move { + let panic_report_url = format!("{}/api/panic", &*client::ZED_SERVER_URL); + let mut children = smol::fs::read_dir(&*paths::LOGS_DIR).await?; + while let Some(child) = children.next().await { + let child = child?; + let child_path = child.path(); -// fn upload_previous_panics(http: Arc, cx: &mut AppContext) { -// let telemetry_settings = *settings::get::(cx); - -// cx.background() -// .spawn({ -// async move { -// let panic_report_url = format!("{}/api/panic", &*client::ZED_SERVER_URL); -// let mut children = smol::fs::read_dir(&*paths::LOGS_DIR).await?; -// while let Some(child) = children.next().await { -// let child = child?; -// let child_path = child.path(); - -// if child_path.extension() != Some(OsStr::new("panic")) { -// continue; -// } -// let filename = if let Some(filename) = child_path.file_name() { -// filename.to_string_lossy() -// } else { -// continue; -// }; - -// if !filename.starts_with("zed") { -// continue; -// } - -// if telemetry_settings.diagnostics { -// let panic_file_content = smol::fs::read_to_string(&child_path) -// .await -// .context("error reading panic file")?; - -// let panic = serde_json::from_str(&panic_file_content) -// .ok() -// .or_else(|| { -// panic_file_content -// .lines() -// .next() -// .and_then(|line| serde_json::from_str(line).ok()) -// }) -// .unwrap_or_else(|| { -// log::error!( -// "failed to deserialize panic file {:?}", -// panic_file_content -// ); -// None -// }); - -// if let Some(panic) = panic { -// let body = serde_json::to_string(&PanicRequest { -// panic, -// token: ZED_SECRET_CLIENT_TOKEN.into(), -// }) -// .unwrap(); - -// let request = Request::post(&panic_report_url) -// .redirect_policy(isahc::config::RedirectPolicy::Follow) -// .header("Content-Type", "application/json") -// .body(body.into())?; -// let response = -// http.send(request).await.context("error sending panic")?; -// if !response.status().is_success() { -// log::error!( -// "Error uploading panic to server: {}", -// response.status() -// ); -// } -// } -// } - -// // We've done what we can, delete the file -// std::fs::remove_file(child_path) -// .context("error removing panic") -// .log_err(); -// } -// Ok::<_, anyhow::Error>(()) -// } -// .log_err() -// }) -// .detach(); -// } + if child_path.extension() != Some(OsStr::new("panic")) { + continue; + } + let filename = if let Some(filename) = child_path.file_name() { + filename.to_string_lossy() + } else { + continue; + }; + + if !filename.starts_with("zed") { + continue; + } + + if telemetry_settings.diagnostics { + let panic_file_content = smol::fs::read_to_string(&child_path) + .await + .context("error reading panic file")?; + + let panic = serde_json::from_str(&panic_file_content) + .ok() + .or_else(|| { + panic_file_content + .lines() + .next() + .and_then(|line| serde_json::from_str(line).ok()) + }) + .unwrap_or_else(|| { + log::error!( + "failed to deserialize panic file {:?}", + panic_file_content + ); + None + }); + + if let Some(panic) = panic { + let body = serde_json::to_string(&PanicRequest { + panic, + token: client::ZED_SECRET_CLIENT_TOKEN.into(), + }) + .unwrap(); + + let request = Request::post(&panic_report_url) + .redirect_policy(isahc::config::RedirectPolicy::Follow) + .header("Content-Type", "application/json") + .body(body.into())?; + let response = http.send(request).await.context("error sending panic")?; + if !response.status().is_success() { + log::error!("Error uploading panic to server: {}", response.status()); + } + } + } + + // We've done what we can, delete the file + std::fs::remove_file(child_path) + .context("error removing panic") + .log_err(); + } + Ok::<_, anyhow::Error>(()) + }) + .detach_and_log_err(cx); +} async fn load_login_shell_environment() -> Result<()> { let marker = "ZED_LOGIN_SHELL_START";