Start work on a client-side telemetry system

Max Brunsfeld created

Change summary

Cargo.lock                                  |   1 
crates/client/Cargo.toml                    |   1 
crates/client/src/channel.rs                |   2 
crates/client/src/client.rs                 |  31 ++++-
crates/client/src/telemetry.rs              | 128 +++++++++++++++++++++++
crates/collab/src/integration_tests.rs      |   2 
crates/contacts_panel/src/contacts_panel.rs |   2 
crates/gpui/src/platform.rs                 |   1 
crates/gpui/src/platform/mac/platform.rs    |  17 ++
crates/gpui/src/platform/test.rs            |   8 +
crates/project/src/project.rs               |   2 
crates/project/src/worktree.rs              |  14 -
crates/workspace/src/workspace.rs           |   2 
crates/zed/src/main.rs                      |  10 -
14 files changed, 191 insertions(+), 30 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -955,6 +955,7 @@ dependencies = [
  "postage",
  "rand 0.8.5",
  "rpc",
+ "serde",
  "smol",
  "sum_tree",
  "thiserror",

crates/client/Cargo.toml 🔗

@@ -32,6 +32,7 @@ thiserror = "1.0.29"
 time = { version = "0.3", features = ["serde", "serde-well-known"] }
 tiny_http = "0.8"
 url = "2.2"
+serde = { version = "*", features = ["derive"] }
 
 [dev-dependencies]
 collections = { path = "../collections", features = ["test-support"] }

crates/client/src/channel.rs 🔗

@@ -601,7 +601,7 @@ mod tests {
 
         let user_id = 5;
         let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
+        let client = cx.update(|cx| Client::new(http_client.clone(), cx));
         let server = FakeServer::for_client(user_id, &client, cx).await;
 
         Channel::init(&client);

crates/client/src/client.rs 🔗

@@ -3,6 +3,7 @@ pub mod test;
 
 pub mod channel;
 pub mod http;
+pub mod telemetry;
 pub mod user;
 
 use anyhow::{anyhow, Context, Result};
@@ -13,8 +14,9 @@ use async_tungstenite::tungstenite::{
 };
 use futures::{future::LocalBoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
 use gpui::{
-    actions, AnyModelHandle, AnyViewHandle, AnyWeakModelHandle, AnyWeakViewHandle, AsyncAppContext,
-    Entity, ModelContext, ModelHandle, MutableAppContext, Task, View, ViewContext, ViewHandle,
+    actions, serde_json::Value, AnyModelHandle, AnyViewHandle, AnyWeakModelHandle,
+    AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle,
+    MutableAppContext, Task, View, ViewContext, ViewHandle,
 };
 use http::HttpClient;
 use lazy_static::lazy_static;
@@ -31,6 +33,7 @@ use std::{
     sync::{Arc, Weak},
     time::{Duration, Instant},
 };
+use telemetry::Telemetry;
 use thiserror::Error;
 use url::Url;
 use util::{ResultExt, TryFutureExt};
@@ -63,6 +66,7 @@ pub struct Client {
     id: usize,
     peer: Arc<Peer>,
     http: Arc<dyn HttpClient>,
+    telemetry: Arc<Telemetry>,
     state: RwLock<ClientState>,
 
     #[allow(clippy::type_complexity)]
@@ -232,10 +236,11 @@ impl Drop for Subscription {
 }
 
 impl Client {
-    pub fn new(http: Arc<dyn HttpClient>) -> Arc<Self> {
+    pub fn new(http: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
         Arc::new(Self {
             id: 0,
             peer: Peer::new(),
+            telemetry: Telemetry::new(http.clone(), cx),
             http,
             state: Default::default(),
 
@@ -595,6 +600,9 @@ impl Client {
         if credentials.is_none() && try_keychain {
             credentials = read_credentials_from_keychain(cx);
             read_from_keychain = credentials.is_some();
+            if read_from_keychain {
+                self.log_event("read_credentials_from_keychain", Default::default());
+            }
         }
         if credentials.is_none() {
             let mut status_rx = self.status();
@@ -878,6 +886,7 @@ impl Client {
     ) -> Task<Result<Credentials>> {
         let platform = cx.platform();
         let executor = cx.background();
+        let telemetry = self.telemetry.clone();
         executor.clone().spawn(async move {
             // Generate a pair of asymmetric encryption keys. The public key will be used by the
             // zed server to encrypt the user's access token, so that it can'be intercepted by
@@ -956,6 +965,8 @@ impl Client {
                 .context("failed to decrypt access token")?;
             platform.activate(true);
 
+            telemetry.log_event("authenticate_with_browser", Default::default());
+
             Ok(Credentials {
                 user_id: user_id.parse()?,
                 access_token,
@@ -1020,6 +1031,10 @@ impl Client {
         log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME);
         self.peer.respond_with_error(receipt, error)
     }
+
+    pub fn log_event(&self, kind: &str, properties: Value) {
+        self.telemetry.log_event(kind, properties)
+    }
 }
 
 impl AnyWeakEntityHandle {
@@ -1085,7 +1100,7 @@ mod tests {
         cx.foreground().forbid_parking();
 
         let user_id = 5;
-        let client = Client::new(FakeHttpClient::with_404_response());
+        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         let server = FakeServer::for_client(user_id, &client, cx).await;
         let mut status = client.status();
         assert!(matches!(
@@ -1124,7 +1139,7 @@ mod tests {
 
         let auth_count = Arc::new(Mutex::new(0));
         let dropped_auth_count = Arc::new(Mutex::new(0));
-        let client = Client::new(FakeHttpClient::with_404_response());
+        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         client.override_authenticate({
             let auth_count = auth_count.clone();
             let dropped_auth_count = dropped_auth_count.clone();
@@ -1173,7 +1188,7 @@ mod tests {
         cx.foreground().forbid_parking();
 
         let user_id = 5;
-        let client = Client::new(FakeHttpClient::with_404_response());
+        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         let server = FakeServer::for_client(user_id, &client, cx).await;
 
         let (done_tx1, mut done_rx1) = smol::channel::unbounded();
@@ -1219,7 +1234,7 @@ mod tests {
         cx.foreground().forbid_parking();
 
         let user_id = 5;
-        let client = Client::new(FakeHttpClient::with_404_response());
+        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         let server = FakeServer::for_client(user_id, &client, cx).await;
 
         let model = cx.add_model(|_| Model::default());
@@ -1247,7 +1262,7 @@ mod tests {
         cx.foreground().forbid_parking();
 
         let user_id = 5;
-        let client = Client::new(FakeHttpClient::with_404_response());
+        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         let server = FakeServer::for_client(user_id, &client, cx).await;
 
         let model = cx.add_model(|_| Model::default());

crates/client/src/telemetry.rs 🔗

@@ -0,0 +1,128 @@
+use crate::{http::HttpClient, ZED_SECRET_CLIENT_TOKEN};
+use gpui::{
+    executor::Background,
+    serde_json::{self, value::Map, Value},
+    AppContext, AppVersion, Task,
+};
+use isahc::Request;
+use parking_lot::Mutex;
+use serde::Serialize;
+use std::{
+    mem,
+    sync::Arc,
+    time::{Duration, SystemTime, UNIX_EPOCH},
+};
+use util::ResultExt;
+
+pub struct Telemetry {
+    client: Arc<dyn HttpClient>,
+    executor: Arc<Background>,
+    state: Mutex<TelemetryState>,
+}
+
+#[derive(Default)]
+struct TelemetryState {
+    metrics_id: Option<i32>,
+    device_id: Option<String>,
+    app_version: Option<AppVersion>,
+    os_version: Option<AppVersion>,
+    queue: Vec<Event>,
+    flush_task: Option<Task<()>>,
+}
+
+#[derive(Serialize)]
+struct RecordEventParams {
+    token: &'static str,
+    metrics_id: Option<i32>,
+    device_id: Option<String>,
+    app_version: Option<String>,
+    os_version: Option<String>,
+    events: Vec<Event>,
+}
+
+#[derive(Serialize)]
+struct Event {
+    #[serde(rename = "type")]
+    kind: String,
+    time: u128,
+    properties: Option<Map<String, Value>>,
+}
+
+const MAX_QUEUE_LEN: usize = 30;
+const EVENTS_URI: &'static str = "https://zed.dev/api/telemetry";
+const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
+
+impl Telemetry {
+    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
+        let platform = cx.platform();
+        Arc::new(Self {
+            client,
+            executor: cx.background().clone(),
+            state: Mutex::new(TelemetryState {
+                os_version: platform.os_version().log_err(),
+                app_version: platform.app_version().log_err(),
+                metrics_id: None,
+                device_id: None,
+                queue: Default::default(),
+                flush_task: Default::default(),
+            }),
+        })
+    }
+
+    pub fn set_metrics_id(&self, metrics_id: Option<i32>) {
+        self.state.lock().metrics_id = metrics_id;
+    }
+
+    pub fn log_event(self: &Arc<Self>, kind: &str, properties: Value) {
+        let mut state = self.state.lock();
+        state.queue.push(Event {
+            kind: kind.to_string(),
+            time: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_millis(),
+            properties: if let Value::Object(properties) = properties {
+                Some(properties)
+            } else {
+                None
+            },
+        });
+        if state.queue.len() >= MAX_QUEUE_LEN {
+            self.flush();
+        } else {
+            let this = self.clone();
+            let executor = self.executor.clone();
+            state.flush_task = Some(self.executor.spawn(async move {
+                executor.timer(DEBOUNCE_INTERVAL).await;
+                this.flush();
+            }));
+        }
+    }
+
+    fn flush(&self) {
+        let mut state = self.state.lock();
+        let events = mem::take(&mut state.queue);
+        let client = self.client.clone();
+        let app_version = state.app_version;
+        let os_version = state.os_version;
+        let metrics_id = state.metrics_id;
+        let device_id = state.device_id.clone();
+        state.flush_task.take();
+        self.executor
+            .spawn(async move {
+                let body = serde_json::to_vec(&RecordEventParams {
+                    token: ZED_SECRET_CLIENT_TOKEN,
+                    events,
+                    app_version: app_version.map(|v| v.to_string()),
+                    os_version: os_version.map(|v| v.to_string()),
+                    metrics_id,
+                    device_id,
+                })
+                .log_err()?;
+                let request = Request::post(EVENTS_URI).body(body.into()).log_err()?;
+                client.send(request).await.log_err();
+                Some(())
+            })
+            .detach();
+    }
+}

crates/collab/src/integration_tests.rs 🔗

@@ -5196,7 +5196,7 @@ impl TestServer {
                 .unwrap()
         };
         let client_name = name.to_string();
-        let mut client = Client::new(http.clone());
+        let mut client = cx.read(|cx| Client::new(http.clone(), cx));
         let server = self.server.clone();
         let db = self.app_state.db.clone();
         let connection_killers = self.connection_killers.clone();

crates/contacts_panel/src/contacts_panel.rs 🔗

@@ -1216,7 +1216,7 @@ mod tests {
 
         let languages = Arc::new(LanguageRegistry::test());
         let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
+        let client = cx.read(|cx| Client::new(http_client.clone(), cx));
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
         let project_store = cx.add_model(|_| ProjectStore::new(project::Db::open_fake()));
         let server = FakeServer::for_client(current_user_id, &client, cx).await;

crates/gpui/src/platform.rs 🔗

@@ -69,6 +69,7 @@ pub trait Platform: Send + Sync {
     fn path_for_auxiliary_executable(&self, name: &str) -> Result<PathBuf>;
     fn app_path(&self) -> Result<PathBuf>;
     fn app_version(&self) -> Result<AppVersion>;
+    fn os_version(&self) -> Result<AppVersion>;
 }
 
 pub(crate) trait ForegroundPlatform {

crates/gpui/src/platform/mac/platform.rs 🔗

@@ -4,7 +4,7 @@ use super::{
 use crate::{
     executor, keymap,
     platform::{self, CursorStyle},
-    Action, ClipboardItem, Event, Menu, MenuItem,
+    Action, AppVersion, ClipboardItem, Event, Menu, MenuItem,
 };
 use anyhow::{anyhow, Result};
 use block::ConcreteBlock;
@@ -16,7 +16,8 @@ use cocoa::{
     },
     base::{id, nil, selector, YES},
     foundation::{
-        NSArray, NSAutoreleasePool, NSBundle, NSData, NSInteger, NSString, NSUInteger, NSURL,
+        NSArray, NSAutoreleasePool, NSBundle, NSData, NSInteger, NSProcessInfo, NSString,
+        NSUInteger, NSURL,
     },
 };
 use core_foundation::{
@@ -748,6 +749,18 @@ impl platform::Platform for MacPlatform {
             }
         }
     }
+
+    fn os_version(&self) -> Result<crate::AppVersion> {
+        unsafe {
+            let process_info = NSProcessInfo::processInfo(nil);
+            let version = process_info.operatingSystemVersion();
+            Ok(AppVersion {
+                major: version.majorVersion as usize,
+                minor: version.minorVersion as usize,
+                patch: version.patchVersion as usize,
+            })
+        }
+    }
 }
 
 unsafe fn path_from_objc(path: id) -> PathBuf {

crates/gpui/src/platform/test.rs 🔗

@@ -196,6 +196,14 @@ impl super::Platform for Platform {
             patch: 0,
         })
     }
+
+    fn os_version(&self) -> Result<AppVersion> {
+        Ok(AppVersion {
+            major: 1,
+            minor: 0,
+            patch: 0,
+        })
+    }
 }
 
 impl Window {

crates/project/src/project.rs 🔗

@@ -650,7 +650,7 @@ impl Project {
 
         let languages = Arc::new(LanguageRegistry::test());
         let http_client = client::test::FakeHttpClient::with_404_response();
-        let client = client::Client::new(http_client.clone());
+        let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
         let project_store = cx.add_model(|_| ProjectStore::new(Db::open_fake()));
         let project = cx.update(|cx| {

crates/project/src/worktree.rs 🔗

@@ -2804,7 +2804,7 @@ mod tests {
         .await;
 
         let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client);
+        let client = cx.read(|cx| Client::new(http_client, cx));
 
         let tree = Worktree::local(
             client,
@@ -2866,8 +2866,7 @@ mod tests {
         fs.insert_symlink("/root/lib/a/lib", "..".into()).await;
         fs.insert_symlink("/root/lib/b/lib", "..".into()).await;
 
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client);
+        let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         let tree = Worktree::local(
             client,
             Arc::from(Path::new("/root")),
@@ -2945,8 +2944,7 @@ mod tests {
         }));
         let dir = parent_dir.path().join("tree");
 
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
+        let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
 
         let tree = Worktree::local(
             client,
@@ -3016,8 +3014,7 @@ mod tests {
             "ignored-dir": {}
         }));
 
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
+        let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
 
         let tree = Worktree::local(
             client,
@@ -3064,8 +3061,7 @@ mod tests {
 
     #[gpui::test(iterations = 30)]
     async fn test_create_directory(cx: &mut TestAppContext) {
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
+        let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
 
         let fs = FakeFs::new(cx.background());
         fs.insert_tree(

crates/workspace/src/workspace.rs 🔗

@@ -856,7 +856,7 @@ impl AppState {
         let fs = project::FakeFs::new(cx.background().clone());
         let languages = Arc::new(LanguageRegistry::test());
         let http_client = client::test::FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
+        let client = Client::new(http_client.clone(), cx);
         let project_store = cx.add_model(|_| ProjectStore::new(project::Db::open_fake()));
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
         let themes = ThemeRegistry::new((), cx.font_cache().clone());

crates/zed/src/main.rs 🔗

@@ -20,7 +20,7 @@ use futures::{
     FutureExt, SinkExt, StreamExt,
 };
 use gpui::{executor::Background, App, AssetSource, AsyncAppContext, Task, ViewContext};
-use isahc::{config::Configurable, AsyncBody, Request};
+use isahc::{config::Configurable, Request};
 use language::LanguageRegistry;
 use log::LevelFilter;
 use parking_lot::Mutex;
@@ -88,7 +88,7 @@ fn main() {
     });
 
     app.run(move |cx| {
-        let client = client::Client::new(http.clone());
+        let client = client::Client::new(http.clone(), cx);
         let mut languages = LanguageRegistry::new(login_shell_env_loaded);
         languages.set_language_server_download_dir(zed::paths::LANGUAGES_DIR.clone());
         let languages = Arc::new(languages);
@@ -280,12 +280,10 @@ fn init_panic_hook(app_version: String, http: Arc<dyn HttpClient>, background: A
                         "token": ZED_SECRET_CLIENT_TOKEN,
                     }))
                     .unwrap();
-                    let request = Request::builder()
-                        .uri(&panic_report_url)
-                        .method(http::Method::POST)
+                    let request = Request::post(&panic_report_url)
                         .redirect_policy(isahc::config::RedirectPolicy::Follow)
                         .header("Content-Type", "application/json")
-                        .body(AsyncBody::from(body))?;
+                        .body(body.into())?;
                     let response = http.send(request).await.context("error sending panic")?;
                     if response.status().is_success() {
                         fs::remove_file(child_path)