Change summary
crates/collab/src/api/events.rs | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
Detailed changes
@@ -418,7 +418,7 @@ pub async fn post_events(
if let Some(kinesis_client) = app.kinesis_client.clone() {
if let Some(stream) = app.config.kinesis_stream.clone() {
let mut request = kinesis_client.put_records().stream_name(stream);
- for row in for_snowflake(request_body.clone(), first_event_at) {
+ for row in for_snowflake(request_body.clone(), first_event_at, country_code.clone()) {
if let Some(data) = serde_json::to_vec(&row).log_err() {
request = request.records(
aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
@@ -1381,6 +1381,7 @@ pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> O
fn for_snowflake(
body: EventRequestBody,
first_event_at: chrono::DateTime<chrono::Utc>,
+ country_code: Option<String>,
) -> impl Iterator<Item = SnowflakeRow> {
body.events.into_iter().flat_map(move |event| {
let timestamp =
@@ -1553,6 +1554,9 @@ fn for_snowflake(
body.release_channel.clone().into(),
);
map.insert("signed_in".to_string(), event.signed_in.into());
+ if let Some(country_code) = country_code.as_ref() {
+ map.insert("country_code".to_string(), country_code.clone().into());
+ }
}
let user_properties = Some(serde_json::json!({