events.rs

  1use super::ips_file::IpsFile;
  2use crate::api::CloudflareIpCountryHeader;
  3use crate::{api::slack, AppState, Error, Result};
  4use anyhow::anyhow;
  5use aws_sdk_s3::primitives::ByteStream;
  6use axum::{
  7    body::Bytes,
  8    headers::Header,
  9    http::{HeaderMap, HeaderName, StatusCode},
 10    routing::post,
 11    Extension, Router, TypedHeader,
 12};
 13use chrono::Duration;
 14use semantic_version::SemanticVersion;
 15use serde::{Deserialize, Serialize};
 16use serde_json::json;
 17use sha2::{Digest, Sha256};
 18use std::sync::{Arc, OnceLock};
 19use telemetry_events::{Event, EventRequestBody, Panic};
 20use util::ResultExt;
 21use uuid::Uuid;
 22
 23const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 24
 25pub fn router() -> Router {
 26    Router::new()
 27        .route("/telemetry/events", post(post_events))
 28        .route("/telemetry/crashes", post(post_crash))
 29        .route("/telemetry/panics", post(post_panic))
 30        .route("/telemetry/hangs", post(post_hang))
 31}
 32
 33pub struct ZedChecksumHeader(Vec<u8>);
 34
 35impl Header for ZedChecksumHeader {
 36    fn name() -> &'static HeaderName {
 37        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 38        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 39    }
 40
 41    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 42    where
 43        Self: Sized,
 44        I: Iterator<Item = &'i axum::http::HeaderValue>,
 45    {
 46        let checksum = values
 47            .next()
 48            .ok_or_else(axum::headers::Error::invalid)?
 49            .to_str()
 50            .map_err(|_| axum::headers::Error::invalid())?;
 51
 52        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 53        Ok(Self(bytes))
 54    }
 55
 56    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 57        unimplemented!()
 58    }
 59}
 60
 61pub async fn post_crash(
 62    Extension(app): Extension<Arc<AppState>>,
 63    headers: HeaderMap,
 64    body: Bytes,
 65) -> Result<()> {
 66    let report = IpsFile::parse(&body)?;
 67    let version_threshold = SemanticVersion::new(0, 123, 0);
 68
 69    let bundle_id = &report.header.bundle_id;
 70    let app_version = &report.app_version();
 71
 72    if bundle_id == "dev.zed.Zed-Dev" {
 73        log::error!("Crash uploads from {} are ignored.", bundle_id);
 74        return Ok(());
 75    }
 76
 77    if app_version.is_none() || app_version.unwrap() < version_threshold {
 78        log::error!(
 79            "Crash uploads from {} are ignored.",
 80            report.header.app_version
 81        );
 82        return Ok(());
 83    }
 84    let app_version = app_version.unwrap();
 85
 86    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 87        let response = blob_store_client
 88            .head_object()
 89            .bucket(CRASH_REPORTS_BUCKET)
 90            .key(report.header.incident_id.clone() + ".ips")
 91            .send()
 92            .await;
 93
 94        if response.is_ok() {
 95            log::info!("We've already uploaded this crash");
 96            return Ok(());
 97        }
 98
 99        blob_store_client
100            .put_object()
101            .bucket(CRASH_REPORTS_BUCKET)
102            .key(report.header.incident_id.clone() + ".ips")
103            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
104            .body(ByteStream::from(body.to_vec()))
105            .send()
106            .await
107            .map_err(|e| log::error!("Failed to upload crash: {}", e))
108            .ok();
109    }
110
111    let recent_panic_on: Option<i64> = headers
112        .get("x-zed-panicked-on")
113        .and_then(|h| h.to_str().ok())
114        .and_then(|s| s.parse().ok());
115
116    let installation_id = headers
117        .get("x-zed-installation-id")
118        .and_then(|h| h.to_str().ok())
119        .map(|s| s.to_string())
120        .unwrap_or_default();
121
122    let mut recent_panic = None;
123
124    if let Some(recent_panic_on) = recent_panic_on {
125        let crashed_at = match report.timestamp() {
126            Ok(t) => Some(t),
127            Err(e) => {
128                log::error!("Can't parse {}: {}", report.header.timestamp, e);
129                None
130            }
131        };
132        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
133            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
134        }
135    }
136
137    let description = report.description(recent_panic);
138    let summary = report.backtrace_summary();
139
140    tracing::error!(
141        service = "client",
142        version = %report.header.app_version,
143        os_version = %report.header.os_version,
144        bundle_id = %report.header.bundle_id,
145        incident_id = %report.header.incident_id,
146        installation_id = %installation_id,
147        description = %description,
148        backtrace = %summary,
149        "crash report"
150    );
151
152    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
153        let payload = slack::WebhookBody::new(|w| {
154            w.add_section(|s| s.text(slack::Text::markdown(description)))
155                .add_section(|s| {
156                    s.add_field(slack::Text::markdown(format!(
157                        "*Version:*\n{} ({})",
158                        bundle_id, app_version
159                    )))
160                    .add_field({
161                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
162                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
163                            hostname.strip_prefix("http://").unwrap_or_default()
164                        });
165
166                        slack::Text::markdown(format!(
167                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
168                            CRASH_REPORTS_BUCKET,
169                            hostname,
170                            report.header.incident_id,
171                            report
172                                .header
173                                .incident_id
174                                .chars()
175                                .take(8)
176                                .collect::<String>(),
177                        ))
178                    })
179                })
180                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
181        });
182        let payload_json = serde_json::to_string(&payload).map_err(|err| {
183            log::error!("Failed to serialize payload to JSON: {err}");
184            Error::Internal(anyhow!(err))
185        })?;
186
187        reqwest::Client::new()
188            .post(slack_panics_webhook)
189            .header("Content-Type", "application/json")
190            .body(payload_json)
191            .send()
192            .await
193            .map_err(|err| {
194                log::error!("Failed to send payload to Slack: {err}");
195                Error::Internal(anyhow!(err))
196            })?;
197    }
198
199    Ok(())
200}
201
202pub async fn post_hang(
203    Extension(app): Extension<Arc<AppState>>,
204    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
205    body: Bytes,
206) -> Result<()> {
207    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
208        return Err(Error::http(
209            StatusCode::INTERNAL_SERVER_ERROR,
210            "events not enabled".into(),
211        ))?;
212    };
213
214    if checksum != expected {
215        return Err(Error::http(
216            StatusCode::BAD_REQUEST,
217            "invalid checksum".into(),
218        ))?;
219    }
220
221    let incident_id = Uuid::new_v4().to_string();
222
223    // dump JSON into S3 so we can get frame offsets if we need to.
224    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
225        blob_store_client
226            .put_object()
227            .bucket(CRASH_REPORTS_BUCKET)
228            .key(incident_id.clone() + ".hang.json")
229            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
230            .body(ByteStream::from(body.to_vec()))
231            .send()
232            .await
233            .map_err(|e| log::error!("Failed to upload crash: {}", e))
234            .ok();
235    }
236
237    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
238        log::error!("can't parse report json: {err}");
239        Error::Internal(anyhow!(err))
240    })?;
241
242    let mut backtrace = "Possible hang detected on main thread:".to_string();
243    let unknown = "<unknown>".to_string();
244    for frame in report.backtrace.iter() {
245        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
246    }
247
248    tracing::error!(
249        service = "client",
250        version = %report.app_version.unwrap_or_default().to_string(),
251        os_name = %report.os_name,
252        os_version = report.os_version.unwrap_or_default().to_string(),
253        incident_id = %incident_id,
254        installation_id = %report.installation_id.unwrap_or_default(),
255        backtrace = %backtrace,
256        "hang report");
257
258    Ok(())
259}
260
261pub async fn post_panic(
262    Extension(app): Extension<Arc<AppState>>,
263    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
264    body: Bytes,
265) -> Result<()> {
266    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
267        return Err(Error::http(
268            StatusCode::INTERNAL_SERVER_ERROR,
269            "events not enabled".into(),
270        ))?;
271    };
272
273    if checksum != expected {
274        return Err(Error::http(
275            StatusCode::BAD_REQUEST,
276            "invalid checksum".into(),
277        ))?;
278    }
279
280    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
281        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
282    let panic = report.panic;
283
284    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
285        return Err(Error::http(
286            StatusCode::BAD_REQUEST,
287            "invalid os version".into(),
288        ))?;
289    }
290
291    tracing::error!(
292        service = "client",
293        version = %panic.app_version,
294        os_name = %panic.os_name,
295        os_version = %panic.os_version.clone().unwrap_or_default(),
296        installation_id = %panic.installation_id.clone().unwrap_or_default(),
297        description = %panic.payload,
298        backtrace = %panic.backtrace.join("\n"),
299        "panic report"
300    );
301
302    let backtrace = if panic.backtrace.len() > 25 {
303        let total = panic.backtrace.len();
304        format!(
305            "{}\n   and {} more",
306            panic
307                .backtrace
308                .iter()
309                .take(20)
310                .cloned()
311                .collect::<Vec<_>>()
312                .join("\n"),
313            total - 20
314        )
315    } else {
316        panic.backtrace.join("\n")
317    };
318
319    if !report_to_slack(&panic) {
320        return Ok(());
321    }
322
323    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
324
325    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
326        let payload = slack::WebhookBody::new(|w| {
327            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
328                .add_section(|s| {
329                    s.add_field(slack::Text::markdown(format!(
330                        "*Version:*\n {} ",
331                        panic.app_version
332                    )))
333                    .add_field({
334                        slack::Text::markdown(format!(
335                            "*OS:*\n{} {}",
336                            panic.os_name,
337                            panic.os_version.unwrap_or_default()
338                        ))
339                    })
340                })
341                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
342        });
343        let payload_json = serde_json::to_string(&payload).map_err(|err| {
344            log::error!("Failed to serialize payload to JSON: {err}");
345            Error::Internal(anyhow!(err))
346        })?;
347
348        reqwest::Client::new()
349            .post(slack_panics_webhook)
350            .header("Content-Type", "application/json")
351            .body(payload_json)
352            .send()
353            .await
354            .map_err(|err| {
355                log::error!("Failed to send payload to Slack: {err}");
356                Error::Internal(anyhow!(err))
357            })?;
358    }
359
360    Ok(())
361}
362
363fn report_to_slack(panic: &Panic) -> bool {
364    if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
365        return false;
366    }
367
368    if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
369        return false;
370    }
371
372    if panic
373        .payload
374        .contains("GPU has crashed, and no debug information is available")
375    {
376        return false;
377    }
378
379    true
380}
381
382pub async fn post_events(
383    Extension(app): Extension<Arc<AppState>>,
384    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
385    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
386    body: Bytes,
387) -> Result<()> {
388    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
389        return Err(Error::http(
390            StatusCode::INTERNAL_SERVER_ERROR,
391            "events not enabled".into(),
392        ))?;
393    };
394
395    let checksum_matched = checksum == expected;
396
397    let request_body: telemetry_events::EventRequestBody =
398        serde_json::from_slice(&body).map_err(|err| {
399            log::error!("can't parse event json: {err}");
400            Error::Internal(anyhow!(err))
401        })?;
402
403    let Some(last_event) = request_body.events.last() else {
404        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
405    };
406    let country_code = country_code_header.map(|h| h.to_string());
407
408    let first_event_at = chrono::Utc::now()
409        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
410
411    if let Some(kinesis_client) = app.kinesis_client.clone() {
412        if let Some(stream) = app.config.kinesis_stream.clone() {
413            let mut request = kinesis_client.put_records().stream_name(stream);
414            for row in for_snowflake(
415                request_body.clone(),
416                first_event_at,
417                country_code.clone(),
418                checksum_matched,
419            ) {
420                if let Some(data) = serde_json::to_vec(&row).log_err() {
421                    request = request.records(
422                        aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
423                            .partition_key(request_body.system_id.clone().unwrap_or_default())
424                            .data(data.into())
425                            .build()
426                            .unwrap(),
427                    );
428                }
429            }
430            request.send().await.log_err();
431        }
432    };
433
434    Ok(())
435}
436
437pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
438    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
439
440    let mut summer = Sha256::new();
441    summer.update(checksum_seed);
442    summer.update(json);
443    summer.update(checksum_seed);
444    Some(summer.finalize().into_iter().collect())
445}
446
447fn for_snowflake(
448    body: EventRequestBody,
449    first_event_at: chrono::DateTime<chrono::Utc>,
450    country_code: Option<String>,
451    checksum_matched: bool,
452) -> impl Iterator<Item = SnowflakeRow> {
453    body.events.into_iter().flat_map(move |event| {
454        let timestamp =
455            first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
456        let (event_type, mut event_properties) = match &event.event {
457            Event::Editor(e) => (
458                match e.operation.as_str() {
459                    "open" => "Editor Opened".to_string(),
460                    "save" => "Editor Saved".to_string(),
461                    _ => format!("Unknown Editor Event: {}", e.operation),
462                },
463                serde_json::to_value(e).unwrap(),
464            ),
465            Event::InlineCompletion(e) => (
466                format!(
467                    "Inline Completion {}",
468                    if e.suggestion_accepted {
469                        "Accepted"
470                    } else {
471                        "Discarded"
472                    }
473                ),
474                serde_json::to_value(e).unwrap(),
475            ),
476            Event::InlineCompletionRating(e) => (
477                "Inline Completion Rated".to_string(),
478                serde_json::to_value(e).unwrap(),
479            ),
480            Event::Call(e) => {
481                let event_type = match e.operation.trim() {
482                    "unshare project" => "Project Unshared".to_string(),
483                    "open channel notes" => "Channel Notes Opened".to_string(),
484                    "share project" => "Project Shared".to_string(),
485                    "join channel" => "Channel Joined".to_string(),
486                    "hang up" => "Call Ended".to_string(),
487                    "accept incoming" => "Incoming Call Accepted".to_string(),
488                    "invite" => "Participant Invited".to_string(),
489                    "disable microphone" => "Microphone Disabled".to_string(),
490                    "enable microphone" => "Microphone Enabled".to_string(),
491                    "enable screen share" => "Screen Share Enabled".to_string(),
492                    "disable screen share" => "Screen Share Disabled".to_string(),
493                    "decline incoming" => "Incoming Call Declined".to_string(),
494                    _ => format!("Unknown Call Event: {}", e.operation),
495                };
496
497                (event_type, serde_json::to_value(e).unwrap())
498            }
499            Event::Assistant(e) => (
500                match e.phase {
501                    telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
502                    telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
503                    telemetry_events::AssistantPhase::Accepted => {
504                        "Assistant Response Accepted".to_string()
505                    }
506                    telemetry_events::AssistantPhase::Rejected => {
507                        "Assistant Response Rejected".to_string()
508                    }
509                },
510                serde_json::to_value(e).unwrap(),
511            ),
512            Event::Cpu(_) | Event::Memory(_) => return None,
513            Event::App(e) => {
514                let mut properties = json!({});
515                let event_type = match e.operation.trim() {
516                    // App
517                    "open" => "App Opened".to_string(),
518                    "first open" => "App First Opened".to_string(),
519                    "first open for release channel" => {
520                        "App First Opened For Release Channel".to_string()
521                    }
522                    "close" => "App Closed".to_string(),
523
524                    // Project
525                    "open project" => "Project Opened".to_string(),
526                    "open node project" => {
527                        properties["project_type"] = json!("node");
528                        "Project Opened".to_string()
529                    }
530                    "open pnpm project" => {
531                        properties["project_type"] = json!("pnpm");
532                        "Project Opened".to_string()
533                    }
534                    "open yarn project" => {
535                        properties["project_type"] = json!("yarn");
536                        "Project Opened".to_string()
537                    }
538
539                    // SSH
540                    "create ssh server" => "SSH Server Created".to_string(),
541                    "create ssh project" => "SSH Project Created".to_string(),
542                    "open ssh project" => "SSH Project Opened".to_string(),
543
544                    // Welcome Page
545                    "welcome page: change keymap" => "Welcome Keymap Changed".to_string(),
546                    "welcome page: change theme" => "Welcome Theme Changed".to_string(),
547                    "welcome page: close" => "Welcome Page Closed".to_string(),
548                    "welcome page: edit settings" => "Welcome Settings Edited".to_string(),
549                    "welcome page: install cli" => "Welcome CLI Installed".to_string(),
550                    "welcome page: open" => "Welcome Page Opened".to_string(),
551                    "welcome page: open extensions" => "Welcome Extensions Page Opened".to_string(),
552                    "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
553                    "welcome page: toggle diagnostic telemetry" => {
554                        "Welcome Diagnostic Telemetry Toggled".to_string()
555                    }
556                    "welcome page: toggle metric telemetry" => {
557                        "Welcome Metric Telemetry Toggled".to_string()
558                    }
559                    "welcome page: toggle vim" => "Welcome Vim Mode Toggled".to_string(),
560                    "welcome page: view docs" => "Welcome Documentation Viewed".to_string(),
561
562                    // Extensions
563                    "extensions page: open" => "Extensions Page Opened".to_string(),
564                    "extensions: install extension" => "Extension Installed".to_string(),
565                    "extensions: uninstall extension" => "Extension Uninstalled".to_string(),
566
567                    // Misc
568                    "markdown preview: open" => "Markdown Preview Opened".to_string(),
569                    "project diagnostics: open" => "Project Diagnostics Opened".to_string(),
570                    "project search: open" => "Project Search Opened".to_string(),
571                    "repl sessions: open" => "REPL Session Started".to_string(),
572
573                    // Feature Upsell
574                    "feature upsell: toggle vim" => {
575                        properties["source"] = json!("Feature Upsell");
576                        "Vim Mode Toggled".to_string()
577                    }
578                    _ => e
579                        .operation
580                        .strip_prefix("feature upsell: viewed docs (")
581                        .and_then(|s| s.strip_suffix(')'))
582                        .map_or_else(
583                            || format!("Unknown App Event: {}", e.operation),
584                            |docs_url| {
585                                properties["url"] = json!(docs_url);
586                                properties["source"] = json!("Feature Upsell");
587                                "Documentation Viewed".to_string()
588                            },
589                        ),
590                };
591                (event_type, properties)
592            }
593            Event::Setting(e) => (
594                "Settings Changed".to_string(),
595                serde_json::to_value(e).unwrap(),
596            ),
597            Event::Extension(e) => (
598                "Extension Loaded".to_string(),
599                serde_json::to_value(e).unwrap(),
600            ),
601            Event::Edit(e) => (
602                "Editor Edited".to_string(),
603                serde_json::to_value(e).unwrap(),
604            ),
605            Event::Action(e) => (
606                "Action Invoked".to_string(),
607                serde_json::to_value(e).unwrap(),
608            ),
609            Event::Repl(e) => (
610                "Kernel Status Changed".to_string(),
611                serde_json::to_value(e).unwrap(),
612            ),
613        };
614
615        if let serde_json::Value::Object(ref mut map) = event_properties {
616            map.insert("app_version".to_string(), body.app_version.clone().into());
617            map.insert("os_name".to_string(), body.os_name.clone().into());
618            map.insert("os_version".to_string(), body.os_version.clone().into());
619            map.insert("architecture".to_string(), body.architecture.clone().into());
620            map.insert(
621                "release_channel".to_string(),
622                body.release_channel.clone().into(),
623            );
624            map.insert("signed_in".to_string(), event.signed_in.into());
625            map.insert("checksum_matched".to_string(), checksum_matched.into());
626            if let Some(country_code) = country_code.as_ref() {
627                map.insert("country".to_string(), country_code.clone().into());
628            }
629        }
630
631        // NOTE: most amplitude user properties are read out of our event_properties
632        // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
633        // for how that is configured.
634        let user_properties = Some(serde_json::json!({
635            "is_staff": body.is_staff,
636        }));
637
638        Some(SnowflakeRow {
639            time: timestamp,
640            user_id: body.metrics_id.clone(),
641            device_id: body.system_id.clone(),
642            event_type,
643            event_properties,
644            user_properties,
645            insert_id: Some(Uuid::new_v4().to_string()),
646        })
647    })
648}
649
650#[derive(Serialize, Deserialize, Debug)]
651pub struct SnowflakeRow {
652    pub time: chrono::DateTime<chrono::Utc>,
653    pub user_id: Option<String>,
654    pub device_id: Option<String>,
655    pub event_type: String,
656    pub event_properties: serde_json::Value,
657    pub user_properties: Option<serde_json::Value>,
658    pub insert_id: Option<String>,
659}
660
661impl SnowflakeRow {
662    pub fn new(
663        event_type: impl Into<String>,
664        metrics_id: Uuid,
665        is_staff: bool,
666        system_id: Option<String>,
667        event_properties: serde_json::Value,
668    ) -> Self {
669        Self {
670            time: chrono::Utc::now(),
671            event_type: event_type.into(),
672            device_id: system_id,
673            user_id: Some(metrics_id.to_string()),
674            insert_id: Some(uuid::Uuid::new_v4().to_string()),
675            event_properties,
676            user_properties: Some(json!({"is_staff": is_staff})),
677        }
678    }
679
680    pub async fn write(
681        self,
682        client: &Option<aws_sdk_kinesis::Client>,
683        stream: &Option<String>,
684    ) -> anyhow::Result<()> {
685        let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
686            return Ok(());
687        };
688        let row = serde_json::to_vec(&self)?;
689        client
690            .put_record()
691            .stream_name(stream)
692            .partition_key(&self.user_id.unwrap_or_default())
693            .data(row.into())
694            .send()
695            .await?;
696        Ok(())
697    }
698}