collab: Add internal `POST /snowflake/events` endpoint (#23842)

Marshall Bowers created

This PR adds a new internal `POST /snowflake/events` endpoint to collab.

This endpoint is protected with the admin token like our other internal
endpoints.

This endpoint accepts a `SnowflakeRow` in the body and writes it to the
AWS Kinesis stream.

Release Notes:

- N/A

Change summary

crates/collab/src/api.rs | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)

Detailed changes

crates/collab/src/api.rs 🔗

@@ -5,6 +5,7 @@ pub mod extensions;
 pub mod ips_file;
 pub mod slack;
 
+use crate::api::events::SnowflakeRow;
 use crate::{
     auth,
     db::{User, UserId},
@@ -99,6 +100,7 @@ pub fn routes(rpc_server: Arc<rpc::Server>) -> Router<(), Body> {
         .route("/user", get(get_authenticated_user))
         .route("/users/:id/access_tokens", post(create_access_token))
         .route("/rpc_server_snapshot", get(get_rpc_server_snapshot))
+        .route("/snowflake/events", post(write_snowflake_event))
         .merge(billing::router())
         .merge(contributors::router())
         .layer(
@@ -245,3 +247,19 @@ async fn create_access_token(
         encrypted_access_token,
     }))
 }
+
+/// An endpoint that writes a Snowflake event to our event stream.
+///
+/// This endpoint is exposed such that other internal services can write
+/// telemetry events without needing to talk to AWS Kinesis directly.
+async fn write_snowflake_event(
+    Extension(app): Extension<Arc<AppState>>,
+    Json(event): Json<SnowflakeRow>,
+) -> Result<()> {
+    let kinesis_client = app.kinesis_client.clone();
+    let kinesis_stream = app.config.kinesis_stream.clone();
+
+    event.write(&kinesis_client, &kinesis_stream).await?;
+
+    Ok(())
+}