collab: Fix writing LLM rate limit events to Clickhouse (#16367)

Marshall Bowers created

This PR fixes the writing of LLM rate limit events to Clickhouse.

We had a table in the table name: `llm_rate_limits` instead of
`llm_rate_limit_events`.

I also extracted a helper function to write to Clickhouse so we can use
it anywhere we need to.

Release Notes:

- N/A

Change summary

crates/collab/src/api/events.rs    | 51 ++++++++-----------------------
crates/collab/src/clickhouse.rs    | 28 +++++++++++++++++
crates/collab/src/lib.rs           |  7 ++-
crates/collab/src/llm/telemetry.rs | 18 ++++++----
4 files changed, 56 insertions(+), 48 deletions(-)

Detailed changes

crates/collab/src/api/events.rs 🔗

@@ -1,5 +1,6 @@
 use super::ips_file::IpsFile;
 use crate::api::CloudflareIpCountryHeader;
+use crate::clickhouse::write_to_table;
 use crate::{api::slack, AppState, Error, Result};
 use anyhow::{anyhow, Context};
 use aws_sdk_s3::primitives::ByteStream;
@@ -529,12 +530,12 @@ struct ToUpload {
 impl ToUpload {
     pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
         const EDITOR_EVENTS_TABLE: &str = "editor_events";
-        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
+        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 
         const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
-        Self::upload_to_table(
+        write_to_table(
             INLINE_COMPLETION_EVENTS_TABLE,
             &self.inline_completion_events,
             clickhouse_client,
@@ -543,7 +544,7 @@ impl ToUpload {
         .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 
         const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
-        Self::upload_to_table(
+        write_to_table(
             ASSISTANT_EVENTS_TABLE,
             &self.assistant_events,
             clickhouse_client,
@@ -552,27 +553,27 @@ impl ToUpload {
         .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 
         const CALL_EVENTS_TABLE: &str = "call_events";
-        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
+        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 
         const CPU_EVENTS_TABLE: &str = "cpu_events";
-        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
+        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 
         const MEMORY_EVENTS_TABLE: &str = "memory_events";
-        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
+        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 
         const APP_EVENTS_TABLE: &str = "app_events";
-        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
+        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 
         const SETTING_EVENTS_TABLE: &str = "setting_events";
-        Self::upload_to_table(
+        write_to_table(
             SETTING_EVENTS_TABLE,
             &self.setting_events,
             clickhouse_client,
@@ -581,7 +582,7 @@ impl ToUpload {
         .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 
         const EXTENSION_EVENTS_TABLE: &str = "extension_events";
-        Self::upload_to_table(
+        write_to_table(
             EXTENSION_EVENTS_TABLE,
             &self.extension_events,
             clickhouse_client,
@@ -590,48 +591,22 @@ impl ToUpload {
         .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 
         const EDIT_EVENTS_TABLE: &str = "edit_events";
-        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
+        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 
         const ACTION_EVENTS_TABLE: &str = "action_events";
-        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
+        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 
         const REPL_EVENTS_TABLE: &str = "repl_events";
-        Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
+        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
             .await
             .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 
         Ok(())
     }
-
-    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
-        table: &str,
-        rows: &[T],
-        clickhouse_client: &clickhouse::Client,
-    ) -> anyhow::Result<()> {
-        if rows.is_empty() {
-            return Ok(());
-        }
-
-        let mut insert = clickhouse_client.insert(table)?;
-
-        for event in rows {
-            insert.write(event).await?;
-        }
-
-        insert.end().await?;
-
-        let event_count = rows.len();
-        log::info!(
-            "wrote {event_count} {event_specifier} to '{table}'",
-            event_specifier = if event_count == 1 { "event" } else { "events" }
-        );
-
-        Ok(())
-    }
 }
 
 pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>

crates/collab/src/clickhouse.rs 🔗

@@ -0,0 +1,28 @@
+use serde::Serialize;
+
+/// Writes the given rows to the specified Clickhouse table.
+pub async fn write_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
+    table: &str,
+    rows: &[T],
+    clickhouse_client: &clickhouse::Client,
+) -> anyhow::Result<()> {
+    if rows.is_empty() {
+        return Ok(());
+    }
+
+    let mut insert = clickhouse_client.insert(table)?;
+
+    for event in rows {
+        insert.write(event).await?;
+    }
+
+    insert.end().await?;
+
+    let event_count = rows.len();
+    log::info!(
+        "wrote {event_count} {event_specifier} to '{table}'",
+        event_specifier = if event_count == 1 { "event" } else { "events" }
+    );
+
+    Ok(())
+}

crates/collab/src/lib.rs 🔗

@@ -1,5 +1,6 @@
 pub mod api;
 pub mod auth;
+pub mod clickhouse;
 pub mod db;
 pub mod env;
 pub mod executor;
@@ -267,7 +268,7 @@ pub struct AppState {
     pub stripe_client: Option<Arc<stripe::Client>>,
     pub rate_limiter: Arc<RateLimiter>,
     pub executor: Executor,
-    pub clickhouse_client: Option<clickhouse::Client>,
+    pub clickhouse_client: Option<::clickhouse::Client>,
     pub config: Config,
 }
 
@@ -358,8 +359,8 @@ async fn build_blob_store_client(config: &Config) -> anyhow::Result<aws_sdk_s3::
     Ok(aws_sdk_s3::Client::new(&s3_config))
 }
 
-fn build_clickhouse_client(config: &Config) -> anyhow::Result<clickhouse::Client> {
-    Ok(clickhouse::Client::default()
+fn build_clickhouse_client(config: &Config) -> anyhow::Result<::clickhouse::Client> {
+    Ok(::clickhouse::Client::default()
         .with_url(
             config
                 .clickhouse_url

crates/collab/src/llm/telemetry.rs 🔗

@@ -1,6 +1,8 @@
-use anyhow::Result;
+use anyhow::{Context, Result};
 use serde::Serialize;
 
+use crate::clickhouse::write_to_table;
+
 #[derive(Serialize, Debug, clickhouse::Row)]
 pub struct LlmUsageEventRow {
     pub time: i64,
@@ -40,9 +42,10 @@ pub struct LlmRateLimitEventRow {
 }
 
 pub async fn report_llm_usage(client: &clickhouse::Client, row: LlmUsageEventRow) -> Result<()> {
-    let mut insert = client.insert("llm_usage_events")?;
-    insert.write(&row).await?;
-    insert.end().await?;
+    const LLM_USAGE_EVENTS_TABLE: &str = "llm_usage_events";
+    write_to_table(LLM_USAGE_EVENTS_TABLE, &[row], client)
+        .await
+        .with_context(|| format!("failed to upload to table '{LLM_USAGE_EVENTS_TABLE}'"))?;
     Ok(())
 }
 
@@ -50,8 +53,9 @@ pub async fn report_llm_rate_limit(
     client: &clickhouse::Client,
     row: LlmRateLimitEventRow,
 ) -> Result<()> {
-    let mut insert = client.insert("llm_rate_limits")?;
-    insert.write(&row).await?;
-    insert.end().await?;
+    const LLM_RATE_LIMIT_EVENTS_TABLE: &str = "llm_rate_limit_events";
+    write_to_table(LLM_RATE_LIMIT_EVENTS_TABLE, &[row], client)
+        .await
+        .with_context(|| format!("failed to upload to table '{LLM_RATE_LIMIT_EVENTS_TABLE}'"))?;
     Ok(())
 }