usages.rs

  1use crate::db::UserId;
  2use crate::llm::Cents;
  3use chrono::{Datelike, Duration};
  4use futures::StreamExt as _;
  5use rpc::LanguageModelProvider;
  6use sea_orm::QuerySelect;
  7use std::{iter, str::FromStr};
  8use strum::IntoEnumIterator as _;
  9
 10use super::*;
 11
 12#[derive(Debug, PartialEq, Clone, Copy, Default, serde::Serialize)]
 13pub struct TokenUsage {
 14    pub input: usize,
 15    pub input_cache_creation: usize,
 16    pub input_cache_read: usize,
 17    pub output: usize,
 18}
 19
 20impl TokenUsage {
 21    pub fn total(&self) -> usize {
 22        self.input + self.input_cache_creation + self.input_cache_read + self.output
 23    }
 24}
 25
 26#[derive(Debug, PartialEq, Clone, Copy, serde::Serialize)]
 27pub struct Usage {
 28    pub requests_this_minute: usize,
 29    pub tokens_this_minute: usize,
 30    pub tokens_this_day: usize,
 31    pub tokens_this_month: TokenUsage,
 32    pub spending_this_month: Cents,
 33    pub lifetime_spending: Cents,
 34}
 35
 36#[derive(Debug, PartialEq, Clone)]
 37pub struct ApplicationWideUsage {
 38    pub provider: LanguageModelProvider,
 39    pub model: String,
 40    pub requests_this_minute: usize,
 41    pub tokens_this_minute: usize,
 42}
 43
 44#[derive(Clone, Copy, Debug, Default)]
 45pub struct ActiveUserCount {
 46    pub users_in_recent_minutes: usize,
 47    pub users_in_recent_days: usize,
 48}
 49
 50impl LlmDatabase {
 51    pub async fn initialize_usage_measures(&mut self) -> Result<()> {
 52        let all_measures = self
 53            .transaction(|tx| async move {
 54                let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
 55
 56                let new_measures = UsageMeasure::iter()
 57                    .filter(|measure| {
 58                        !existing_measures
 59                            .iter()
 60                            .any(|m| m.name == measure.to_string())
 61                    })
 62                    .map(|measure| usage_measure::ActiveModel {
 63                        name: ActiveValue::set(measure.to_string()),
 64                        ..Default::default()
 65                    })
 66                    .collect::<Vec<_>>();
 67
 68                if !new_measures.is_empty() {
 69                    usage_measure::Entity::insert_many(new_measures)
 70                        .exec(&*tx)
 71                        .await?;
 72                }
 73
 74                Ok(usage_measure::Entity::find().all(&*tx).await?)
 75            })
 76            .await?;
 77
 78        self.usage_measure_ids = all_measures
 79            .into_iter()
 80            .filter_map(|measure| {
 81                UsageMeasure::from_str(&measure.name)
 82                    .ok()
 83                    .map(|um| (um, measure.id))
 84            })
 85            .collect();
 86        Ok(())
 87    }
 88
 89    pub async fn get_application_wide_usages_by_model(
 90        &self,
 91        now: DateTimeUtc,
 92    ) -> Result<Vec<ApplicationWideUsage>> {
 93        self.transaction(|tx| async move {
 94            let past_minute = now - Duration::minutes(1);
 95            let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
 96            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
 97
 98            let mut results = Vec::new();
 99            for ((provider, model_name), model) in self.models.iter() {
100                let mut usages = usage::Entity::find()
101                    .filter(
102                        usage::Column::Timestamp
103                            .gte(past_minute.naive_utc())
104                            .and(usage::Column::IsStaff.eq(false))
105                            .and(usage::Column::ModelId.eq(model.id))
106                            .and(
107                                usage::Column::MeasureId
108                                    .eq(requests_per_minute)
109                                    .or(usage::Column::MeasureId.eq(tokens_per_minute)),
110                            ),
111                    )
112                    .stream(&*tx)
113                    .await?;
114
115                let mut requests_this_minute = 0;
116                let mut tokens_this_minute = 0;
117                while let Some(usage) = usages.next().await {
118                    let usage = usage?;
119                    if usage.measure_id == requests_per_minute {
120                        requests_this_minute += Self::get_live_buckets(
121                            &usage,
122                            now.naive_utc(),
123                            UsageMeasure::RequestsPerMinute,
124                        )
125                        .0
126                        .iter()
127                        .copied()
128                        .sum::<i64>() as usize;
129                    } else if usage.measure_id == tokens_per_minute {
130                        tokens_this_minute += Self::get_live_buckets(
131                            &usage,
132                            now.naive_utc(),
133                            UsageMeasure::TokensPerMinute,
134                        )
135                        .0
136                        .iter()
137                        .copied()
138                        .sum::<i64>() as usize;
139                    }
140                }
141
142                results.push(ApplicationWideUsage {
143                    provider: *provider,
144                    model: model_name.clone(),
145                    requests_this_minute,
146                    tokens_this_minute,
147                })
148            }
149
150            Ok(results)
151        })
152        .await
153    }
154
155    pub async fn get_user_spending_for_month(
156        &self,
157        user_id: UserId,
158        now: DateTimeUtc,
159    ) -> Result<Cents> {
160        self.transaction(|tx| async move {
161            let month = now.date_naive().month() as i32;
162            let year = now.date_naive().year();
163
164            let mut monthly_usages = monthly_usage::Entity::find()
165                .filter(
166                    monthly_usage::Column::UserId
167                        .eq(user_id)
168                        .and(monthly_usage::Column::Month.eq(month))
169                        .and(monthly_usage::Column::Year.eq(year)),
170                )
171                .stream(&*tx)
172                .await?;
173            let mut monthly_spending = Cents::ZERO;
174
175            while let Some(usage) = monthly_usages.next().await {
176                let usage = usage?;
177                let Ok(model) = self.model_by_id(usage.model_id) else {
178                    continue;
179                };
180
181                monthly_spending += calculate_spending(
182                    model,
183                    usage.input_tokens as usize,
184                    usage.cache_creation_input_tokens as usize,
185                    usage.cache_read_input_tokens as usize,
186                    usage.output_tokens as usize,
187                );
188            }
189
190            Ok(monthly_spending)
191        })
192        .await
193    }
194
195    pub async fn get_usage(
196        &self,
197        user_id: UserId,
198        provider: LanguageModelProvider,
199        model_name: &str,
200        now: DateTimeUtc,
201    ) -> Result<Usage> {
202        self.transaction(|tx| async move {
203            let model = self
204                .models
205                .get(&(provider, model_name.to_string()))
206                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
207
208            let usages = usage::Entity::find()
209                .filter(
210                    usage::Column::UserId
211                        .eq(user_id)
212                        .and(usage::Column::ModelId.eq(model.id)),
213                )
214                .all(&*tx)
215                .await?;
216
217            let month = now.date_naive().month() as i32;
218            let year = now.date_naive().year();
219            let monthly_usage = monthly_usage::Entity::find()
220                .filter(
221                    monthly_usage::Column::UserId
222                        .eq(user_id)
223                        .and(monthly_usage::Column::ModelId.eq(model.id))
224                        .and(monthly_usage::Column::Month.eq(month))
225                        .and(monthly_usage::Column::Year.eq(year)),
226                )
227                .one(&*tx)
228                .await?;
229            let lifetime_usage = lifetime_usage::Entity::find()
230                .filter(
231                    lifetime_usage::Column::UserId
232                        .eq(user_id)
233                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
234                )
235                .one(&*tx)
236                .await?;
237
238            let requests_this_minute =
239                self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
240            let tokens_this_minute =
241                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
242            let tokens_this_day =
243                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
244            let spending_this_month = if let Some(monthly_usage) = &monthly_usage {
245                calculate_spending(
246                    model,
247                    monthly_usage.input_tokens as usize,
248                    monthly_usage.cache_creation_input_tokens as usize,
249                    monthly_usage.cache_read_input_tokens as usize,
250                    monthly_usage.output_tokens as usize,
251                )
252            } else {
253                Cents::ZERO
254            };
255            let lifetime_spending = if let Some(lifetime_usage) = &lifetime_usage {
256                calculate_spending(
257                    model,
258                    lifetime_usage.input_tokens as usize,
259                    lifetime_usage.cache_creation_input_tokens as usize,
260                    lifetime_usage.cache_read_input_tokens as usize,
261                    lifetime_usage.output_tokens as usize,
262                )
263            } else {
264                Cents::ZERO
265            };
266
267            Ok(Usage {
268                requests_this_minute,
269                tokens_this_minute,
270                tokens_this_day,
271                tokens_this_month: TokenUsage {
272                    input: monthly_usage
273                        .as_ref()
274                        .map_or(0, |usage| usage.input_tokens as usize),
275                    input_cache_creation: monthly_usage
276                        .as_ref()
277                        .map_or(0, |usage| usage.cache_creation_input_tokens as usize),
278                    input_cache_read: monthly_usage
279                        .as_ref()
280                        .map_or(0, |usage| usage.cache_read_input_tokens as usize),
281                    output: monthly_usage
282                        .as_ref()
283                        .map_or(0, |usage| usage.output_tokens as usize),
284                },
285                spending_this_month,
286                lifetime_spending,
287            })
288        })
289        .await
290    }
291
292    pub async fn record_usage(
293        &self,
294        user_id: UserId,
295        is_staff: bool,
296        provider: LanguageModelProvider,
297        model_name: &str,
298        tokens: TokenUsage,
299        has_llm_subscription: bool,
300        max_monthly_spend: Cents,
301        free_tier_monthly_spending_limit: Cents,
302        now: DateTimeUtc,
303    ) -> Result<Usage> {
304        self.transaction(|tx| async move {
305            let model = self.model(provider, model_name)?;
306
307            let usages = usage::Entity::find()
308                .filter(
309                    usage::Column::UserId
310                        .eq(user_id)
311                        .and(usage::Column::ModelId.eq(model.id)),
312                )
313                .all(&*tx)
314                .await?;
315
316            let requests_this_minute = self
317                .update_usage_for_measure(
318                    user_id,
319                    is_staff,
320                    model.id,
321                    &usages,
322                    UsageMeasure::RequestsPerMinute,
323                    now,
324                    1,
325                    &tx,
326                )
327                .await?;
328            let tokens_this_minute = self
329                .update_usage_for_measure(
330                    user_id,
331                    is_staff,
332                    model.id,
333                    &usages,
334                    UsageMeasure::TokensPerMinute,
335                    now,
336                    tokens.total(),
337                    &tx,
338                )
339                .await?;
340            let tokens_this_day = self
341                .update_usage_for_measure(
342                    user_id,
343                    is_staff,
344                    model.id,
345                    &usages,
346                    UsageMeasure::TokensPerDay,
347                    now,
348                    tokens.total(),
349                    &tx,
350                )
351                .await?;
352
353            let month = now.date_naive().month() as i32;
354            let year = now.date_naive().year();
355
356            // Update monthly usage
357            let monthly_usage = monthly_usage::Entity::find()
358                .filter(
359                    monthly_usage::Column::UserId
360                        .eq(user_id)
361                        .and(monthly_usage::Column::ModelId.eq(model.id))
362                        .and(monthly_usage::Column::Month.eq(month))
363                        .and(monthly_usage::Column::Year.eq(year)),
364                )
365                .one(&*tx)
366                .await?;
367
368            let monthly_usage = match monthly_usage {
369                Some(usage) => {
370                    monthly_usage::Entity::update(monthly_usage::ActiveModel {
371                        id: ActiveValue::unchanged(usage.id),
372                        input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
373                        cache_creation_input_tokens: ActiveValue::set(
374                            usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
375                        ),
376                        cache_read_input_tokens: ActiveValue::set(
377                            usage.cache_read_input_tokens + tokens.input_cache_read as i64,
378                        ),
379                        output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
380                        ..Default::default()
381                    })
382                    .exec(&*tx)
383                    .await?
384                }
385                None => {
386                    monthly_usage::ActiveModel {
387                        user_id: ActiveValue::set(user_id),
388                        model_id: ActiveValue::set(model.id),
389                        month: ActiveValue::set(month),
390                        year: ActiveValue::set(year),
391                        input_tokens: ActiveValue::set(tokens.input as i64),
392                        cache_creation_input_tokens: ActiveValue::set(
393                            tokens.input_cache_creation as i64,
394                        ),
395                        cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
396                        output_tokens: ActiveValue::set(tokens.output as i64),
397                        ..Default::default()
398                    }
399                    .insert(&*tx)
400                    .await?
401                }
402            };
403
404            let spending_this_month = calculate_spending(
405                model,
406                monthly_usage.input_tokens as usize,
407                monthly_usage.cache_creation_input_tokens as usize,
408                monthly_usage.cache_read_input_tokens as usize,
409                monthly_usage.output_tokens as usize,
410            );
411
412            if !is_staff
413                && spending_this_month > free_tier_monthly_spending_limit
414                && has_llm_subscription
415                && (spending_this_month - free_tier_monthly_spending_limit) <= max_monthly_spend
416            {
417                billing_event::ActiveModel {
418                    id: ActiveValue::not_set(),
419                    idempotency_key: ActiveValue::not_set(),
420                    user_id: ActiveValue::set(user_id),
421                    model_id: ActiveValue::set(model.id),
422                    input_tokens: ActiveValue::set(tokens.input as i64),
423                    input_cache_creation_tokens: ActiveValue::set(
424                        tokens.input_cache_creation as i64,
425                    ),
426                    input_cache_read_tokens: ActiveValue::set(tokens.input_cache_read as i64),
427                    output_tokens: ActiveValue::set(tokens.output as i64),
428                }
429                .insert(&*tx)
430                .await?;
431            }
432
433            // Update lifetime usage
434            let lifetime_usage = lifetime_usage::Entity::find()
435                .filter(
436                    lifetime_usage::Column::UserId
437                        .eq(user_id)
438                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
439                )
440                .one(&*tx)
441                .await?;
442
443            let lifetime_usage = match lifetime_usage {
444                Some(usage) => {
445                    lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
446                        id: ActiveValue::unchanged(usage.id),
447                        input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
448                        cache_creation_input_tokens: ActiveValue::set(
449                            usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
450                        ),
451                        cache_read_input_tokens: ActiveValue::set(
452                            usage.cache_read_input_tokens + tokens.input_cache_read as i64,
453                        ),
454                        output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
455                        ..Default::default()
456                    })
457                    .exec(&*tx)
458                    .await?
459                }
460                None => {
461                    lifetime_usage::ActiveModel {
462                        user_id: ActiveValue::set(user_id),
463                        model_id: ActiveValue::set(model.id),
464                        input_tokens: ActiveValue::set(tokens.input as i64),
465                        cache_creation_input_tokens: ActiveValue::set(
466                            tokens.input_cache_creation as i64,
467                        ),
468                        cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
469                        output_tokens: ActiveValue::set(tokens.output as i64),
470                        ..Default::default()
471                    }
472                    .insert(&*tx)
473                    .await?
474                }
475            };
476
477            let lifetime_spending = calculate_spending(
478                model,
479                lifetime_usage.input_tokens as usize,
480                lifetime_usage.cache_creation_input_tokens as usize,
481                lifetime_usage.cache_read_input_tokens as usize,
482                lifetime_usage.output_tokens as usize,
483            );
484
485            Ok(Usage {
486                requests_this_minute,
487                tokens_this_minute,
488                tokens_this_day,
489                tokens_this_month: TokenUsage {
490                    input: monthly_usage.input_tokens as usize,
491                    input_cache_creation: monthly_usage.cache_creation_input_tokens as usize,
492                    input_cache_read: monthly_usage.cache_read_input_tokens as usize,
493                    output: monthly_usage.output_tokens as usize,
494                },
495                spending_this_month,
496                lifetime_spending,
497            })
498        })
499        .await
500    }
501
502    /// Returns the active user count for the specified model.
503    pub async fn get_active_user_count(
504        &self,
505        provider: LanguageModelProvider,
506        model_name: &str,
507        now: DateTimeUtc,
508    ) -> Result<ActiveUserCount> {
509        self.transaction(|tx| async move {
510            let minute_since = now - Duration::minutes(5);
511            let day_since = now - Duration::days(5);
512
513            let model = self
514                .models
515                .get(&(provider, model_name.to_string()))
516                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
517
518            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
519
520            let users_in_recent_minutes = usage::Entity::find()
521                .filter(
522                    usage::Column::ModelId
523                        .eq(model.id)
524                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
525                        .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
526                        .and(usage::Column::IsStaff.eq(false)),
527                )
528                .select_only()
529                .column(usage::Column::UserId)
530                .group_by(usage::Column::UserId)
531                .count(&*tx)
532                .await? as usize;
533
534            let users_in_recent_days = usage::Entity::find()
535                .filter(
536                    usage::Column::ModelId
537                        .eq(model.id)
538                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
539                        .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
540                        .and(usage::Column::IsStaff.eq(false)),
541                )
542                .select_only()
543                .column(usage::Column::UserId)
544                .group_by(usage::Column::UserId)
545                .count(&*tx)
546                .await? as usize;
547
548            Ok(ActiveUserCount {
549                users_in_recent_minutes,
550                users_in_recent_days,
551            })
552        })
553        .await
554    }
555
556    async fn update_usage_for_measure(
557        &self,
558        user_id: UserId,
559        is_staff: bool,
560        model_id: ModelId,
561        usages: &[usage::Model],
562        usage_measure: UsageMeasure,
563        now: DateTimeUtc,
564        usage_to_add: usize,
565        tx: &DatabaseTransaction,
566    ) -> Result<usize> {
567        let now = now.naive_utc();
568        let measure_id = *self
569            .usage_measure_ids
570            .get(&usage_measure)
571            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
572
573        let mut id = None;
574        let mut timestamp = now;
575        let mut buckets = vec![0_i64];
576
577        if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
578            id = Some(old_usage.id);
579            let (live_buckets, buckets_since) =
580                Self::get_live_buckets(old_usage, now, usage_measure);
581            if !live_buckets.is_empty() {
582                buckets.clear();
583                buckets.extend_from_slice(live_buckets);
584                buckets.extend(iter::repeat(0).take(buckets_since));
585                timestamp =
586                    old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
587            }
588        }
589
590        *buckets.last_mut().unwrap() += usage_to_add as i64;
591        let total_usage = buckets.iter().sum::<i64>() as usize;
592
593        let mut model = usage::ActiveModel {
594            user_id: ActiveValue::set(user_id),
595            is_staff: ActiveValue::set(is_staff),
596            model_id: ActiveValue::set(model_id),
597            measure_id: ActiveValue::set(measure_id),
598            timestamp: ActiveValue::set(timestamp),
599            buckets: ActiveValue::set(buckets),
600            ..Default::default()
601        };
602
603        if let Some(id) = id {
604            model.id = ActiveValue::unchanged(id);
605            model.update(tx).await?;
606        } else {
607            usage::Entity::insert(model)
608                .exec_without_returning(tx)
609                .await?;
610        }
611
612        Ok(total_usage)
613    }
614
615    fn get_usage_for_measure(
616        &self,
617        usages: &[usage::Model],
618        now: DateTimeUtc,
619        usage_measure: UsageMeasure,
620    ) -> Result<usize> {
621        let now = now.naive_utc();
622        let measure_id = *self
623            .usage_measure_ids
624            .get(&usage_measure)
625            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
626        let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
627            return Ok(0);
628        };
629
630        let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
631        Ok(live_buckets.iter().sum::<i64>() as _)
632    }
633
634    fn get_live_buckets(
635        usage: &usage::Model,
636        now: chrono::NaiveDateTime,
637        measure: UsageMeasure,
638    ) -> (&[i64], usize) {
639        let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
640        let buckets_since_usage =
641            seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
642        let buckets_since_usage = buckets_since_usage.ceil() as usize;
643        let mut live_buckets = &[] as &[i64];
644        if buckets_since_usage < measure.bucket_count() {
645            let expired_bucket_count =
646                (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
647            live_buckets = &usage.buckets[expired_bucket_count..];
648            while live_buckets.first() == Some(&0) {
649                live_buckets = &live_buckets[1..];
650            }
651        }
652        (live_buckets, buckets_since_usage)
653    }
654}
655
656fn calculate_spending(
657    model: &model::Model,
658    input_tokens_this_month: usize,
659    cache_creation_input_tokens_this_month: usize,
660    cache_read_input_tokens_this_month: usize,
661    output_tokens_this_month: usize,
662) -> Cents {
663    let input_token_cost =
664        input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
665    let cache_creation_input_token_cost = cache_creation_input_tokens_this_month
666        * model.price_per_million_cache_creation_input_tokens as usize
667        / 1_000_000;
668    let cache_read_input_token_cost = cache_read_input_tokens_this_month
669        * model.price_per_million_cache_read_input_tokens as usize
670        / 1_000_000;
671    let output_token_cost =
672        output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
673    let spending = input_token_cost
674        + cache_creation_input_token_cost
675        + cache_read_input_token_cost
676        + output_token_cost;
677    Cents::new(spending as u32)
678}
679
680const MINUTE_BUCKET_COUNT: usize = 12;
681const DAY_BUCKET_COUNT: usize = 48;
682
683impl UsageMeasure {
684    fn bucket_count(&self) -> usize {
685        match self {
686            UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
687            UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
688            UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
689        }
690    }
691
692    fn total_duration(&self) -> Duration {
693        match self {
694            UsageMeasure::RequestsPerMinute => Duration::minutes(1),
695            UsageMeasure::TokensPerMinute => Duration::minutes(1),
696            UsageMeasure::TokensPerDay => Duration::hours(24),
697        }
698    }
699
700    fn bucket_duration(&self) -> Duration {
701        self.total_duration() / self.bucket_count() as i32
702    }
703}