usages.rs

  1use crate::db::UserId;
  2use crate::llm::Cents;
  3use chrono::{Datelike, Duration};
  4use futures::StreamExt as _;
  5use rpc::LanguageModelProvider;
  6use sea_orm::QuerySelect;
  7use std::{iter, str::FromStr};
  8use strum::IntoEnumIterator as _;
  9
 10use super::*;
 11
 12#[derive(Debug, PartialEq, Clone, Copy, Default, serde::Serialize)]
 13pub struct TokenUsage {
 14    pub input: usize,
 15    pub input_cache_creation: usize,
 16    pub input_cache_read: usize,
 17    pub output: usize,
 18}
 19
 20impl TokenUsage {
 21    pub fn total(&self) -> usize {
 22        self.input + self.input_cache_creation + self.input_cache_read + self.output
 23    }
 24}
 25
 26#[derive(Debug, PartialEq, Clone, Copy, serde::Serialize)]
 27pub struct Usage {
 28    pub requests_this_minute: usize,
 29    pub tokens_this_minute: usize,
 30    pub tokens_this_day: usize,
 31    pub tokens_this_month: TokenUsage,
 32    pub spending_this_month: Cents,
 33    pub lifetime_spending: Cents,
 34}
 35
 36#[derive(Debug, PartialEq, Clone)]
 37pub struct ApplicationWideUsage {
 38    pub provider: LanguageModelProvider,
 39    pub model: String,
 40    pub requests_this_minute: usize,
 41    pub tokens_this_minute: usize,
 42}
 43
 44#[derive(Clone, Copy, Debug, Default)]
 45pub struct ActiveUserCount {
 46    pub users_in_recent_minutes: usize,
 47    pub users_in_recent_days: usize,
 48}
 49
 50impl LlmDatabase {
 51    pub async fn initialize_usage_measures(&mut self) -> Result<()> {
 52        let all_measures = self
 53            .transaction(|tx| async move {
 54                let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
 55
 56                let new_measures = UsageMeasure::iter()
 57                    .filter(|measure| {
 58                        !existing_measures
 59                            .iter()
 60                            .any(|m| m.name == measure.to_string())
 61                    })
 62                    .map(|measure| usage_measure::ActiveModel {
 63                        name: ActiveValue::set(measure.to_string()),
 64                        ..Default::default()
 65                    })
 66                    .collect::<Vec<_>>();
 67
 68                if !new_measures.is_empty() {
 69                    usage_measure::Entity::insert_many(new_measures)
 70                        .exec(&*tx)
 71                        .await?;
 72                }
 73
 74                Ok(usage_measure::Entity::find().all(&*tx).await?)
 75            })
 76            .await?;
 77
 78        self.usage_measure_ids = all_measures
 79            .into_iter()
 80            .filter_map(|measure| {
 81                UsageMeasure::from_str(&measure.name)
 82                    .ok()
 83                    .map(|um| (um, measure.id))
 84            })
 85            .collect();
 86        Ok(())
 87    }
 88
 89    pub async fn get_application_wide_usages_by_model(
 90        &self,
 91        now: DateTimeUtc,
 92    ) -> Result<Vec<ApplicationWideUsage>> {
 93        self.transaction(|tx| async move {
 94            let past_minute = now - Duration::minutes(1);
 95            let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
 96            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
 97
 98            let mut results = Vec::new();
 99            for ((provider, model_name), model) in self.models.iter() {
100                let mut usages = usage::Entity::find()
101                    .filter(
102                        usage::Column::Timestamp
103                            .gte(past_minute.naive_utc())
104                            .and(usage::Column::IsStaff.eq(false))
105                            .and(usage::Column::ModelId.eq(model.id))
106                            .and(
107                                usage::Column::MeasureId
108                                    .eq(requests_per_minute)
109                                    .or(usage::Column::MeasureId.eq(tokens_per_minute)),
110                            ),
111                    )
112                    .stream(&*tx)
113                    .await?;
114
115                let mut requests_this_minute = 0;
116                let mut tokens_this_minute = 0;
117                while let Some(usage) = usages.next().await {
118                    let usage = usage?;
119                    if usage.measure_id == requests_per_minute {
120                        requests_this_minute += Self::get_live_buckets(
121                            &usage,
122                            now.naive_utc(),
123                            UsageMeasure::RequestsPerMinute,
124                        )
125                        .0
126                        .iter()
127                        .copied()
128                        .sum::<i64>() as usize;
129                    } else if usage.measure_id == tokens_per_minute {
130                        tokens_this_minute += Self::get_live_buckets(
131                            &usage,
132                            now.naive_utc(),
133                            UsageMeasure::TokensPerMinute,
134                        )
135                        .0
136                        .iter()
137                        .copied()
138                        .sum::<i64>() as usize;
139                    }
140                }
141
142                results.push(ApplicationWideUsage {
143                    provider: *provider,
144                    model: model_name.clone(),
145                    requests_this_minute,
146                    tokens_this_minute,
147                })
148            }
149
150            Ok(results)
151        })
152        .await
153    }
154
155    pub async fn get_user_spending_for_month(
156        &self,
157        user_id: UserId,
158        now: DateTimeUtc,
159    ) -> Result<Cents> {
160        self.transaction(|tx| async move {
161            let month = now.date_naive().month() as i32;
162            let year = now.date_naive().year();
163
164            let mut monthly_usages = monthly_usage::Entity::find()
165                .filter(
166                    monthly_usage::Column::UserId
167                        .eq(user_id)
168                        .and(monthly_usage::Column::Month.eq(month))
169                        .and(monthly_usage::Column::Year.eq(year)),
170                )
171                .stream(&*tx)
172                .await?;
173            let mut monthly_spending = Cents::ZERO;
174
175            while let Some(usage) = monthly_usages.next().await {
176                let usage = usage?;
177                let Ok(model) = self.model_by_id(usage.model_id) else {
178                    continue;
179                };
180
181                monthly_spending += calculate_spending(
182                    model,
183                    usage.input_tokens as usize,
184                    usage.cache_creation_input_tokens as usize,
185                    usage.cache_read_input_tokens as usize,
186                    usage.output_tokens as usize,
187                );
188            }
189
190            Ok(monthly_spending)
191        })
192        .await
193    }
194
195    pub async fn get_usage(
196        &self,
197        user_id: UserId,
198        provider: LanguageModelProvider,
199        model_name: &str,
200        now: DateTimeUtc,
201    ) -> Result<Usage> {
202        self.transaction(|tx| async move {
203            let model = self
204                .models
205                .get(&(provider, model_name.to_string()))
206                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
207
208            let usages = usage::Entity::find()
209                .filter(
210                    usage::Column::UserId
211                        .eq(user_id)
212                        .and(usage::Column::ModelId.eq(model.id)),
213                )
214                .all(&*tx)
215                .await?;
216
217            let month = now.date_naive().month() as i32;
218            let year = now.date_naive().year();
219            let monthly_usage = monthly_usage::Entity::find()
220                .filter(
221                    monthly_usage::Column::UserId
222                        .eq(user_id)
223                        .and(monthly_usage::Column::ModelId.eq(model.id))
224                        .and(monthly_usage::Column::Month.eq(month))
225                        .and(monthly_usage::Column::Year.eq(year)),
226                )
227                .one(&*tx)
228                .await?;
229            let lifetime_usage = lifetime_usage::Entity::find()
230                .filter(
231                    lifetime_usage::Column::UserId
232                        .eq(user_id)
233                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
234                )
235                .one(&*tx)
236                .await?;
237
238            let requests_this_minute =
239                self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
240            let tokens_this_minute =
241                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
242            let tokens_this_day =
243                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
244            let spending_this_month = if let Some(monthly_usage) = &monthly_usage {
245                calculate_spending(
246                    model,
247                    monthly_usage.input_tokens as usize,
248                    monthly_usage.cache_creation_input_tokens as usize,
249                    monthly_usage.cache_read_input_tokens as usize,
250                    monthly_usage.output_tokens as usize,
251                )
252            } else {
253                Cents::ZERO
254            };
255            let lifetime_spending = if let Some(lifetime_usage) = &lifetime_usage {
256                calculate_spending(
257                    model,
258                    lifetime_usage.input_tokens as usize,
259                    lifetime_usage.cache_creation_input_tokens as usize,
260                    lifetime_usage.cache_read_input_tokens as usize,
261                    lifetime_usage.output_tokens as usize,
262                )
263            } else {
264                Cents::ZERO
265            };
266
267            Ok(Usage {
268                requests_this_minute,
269                tokens_this_minute,
270                tokens_this_day,
271                tokens_this_month: TokenUsage {
272                    input: monthly_usage
273                        .as_ref()
274                        .map_or(0, |usage| usage.input_tokens as usize),
275                    input_cache_creation: monthly_usage
276                        .as_ref()
277                        .map_or(0, |usage| usage.cache_creation_input_tokens as usize),
278                    input_cache_read: monthly_usage
279                        .as_ref()
280                        .map_or(0, |usage| usage.cache_read_input_tokens as usize),
281                    output: monthly_usage
282                        .as_ref()
283                        .map_or(0, |usage| usage.output_tokens as usize),
284                },
285                spending_this_month,
286                lifetime_spending,
287            })
288        })
289        .await
290    }
291
292    #[allow(clippy::too_many_arguments)]
293    pub async fn record_usage(
294        &self,
295        user_id: UserId,
296        is_staff: bool,
297        provider: LanguageModelProvider,
298        model_name: &str,
299        tokens: TokenUsage,
300        has_llm_subscription: bool,
301        max_monthly_spend: Cents,
302        free_tier_monthly_spending_limit: Cents,
303        now: DateTimeUtc,
304    ) -> Result<Usage> {
305        self.transaction(|tx| async move {
306            let model = self.model(provider, model_name)?;
307
308            let usages = usage::Entity::find()
309                .filter(
310                    usage::Column::UserId
311                        .eq(user_id)
312                        .and(usage::Column::ModelId.eq(model.id)),
313                )
314                .all(&*tx)
315                .await?;
316
317            let requests_this_minute = self
318                .update_usage_for_measure(
319                    user_id,
320                    is_staff,
321                    model.id,
322                    &usages,
323                    UsageMeasure::RequestsPerMinute,
324                    now,
325                    1,
326                    &tx,
327                )
328                .await?;
329            let tokens_this_minute = self
330                .update_usage_for_measure(
331                    user_id,
332                    is_staff,
333                    model.id,
334                    &usages,
335                    UsageMeasure::TokensPerMinute,
336                    now,
337                    tokens.total(),
338                    &tx,
339                )
340                .await?;
341            let tokens_this_day = self
342                .update_usage_for_measure(
343                    user_id,
344                    is_staff,
345                    model.id,
346                    &usages,
347                    UsageMeasure::TokensPerDay,
348                    now,
349                    tokens.total(),
350                    &tx,
351                )
352                .await?;
353
354            let month = now.date_naive().month() as i32;
355            let year = now.date_naive().year();
356
357            // Update monthly usage
358            let monthly_usage = monthly_usage::Entity::find()
359                .filter(
360                    monthly_usage::Column::UserId
361                        .eq(user_id)
362                        .and(monthly_usage::Column::ModelId.eq(model.id))
363                        .and(monthly_usage::Column::Month.eq(month))
364                        .and(monthly_usage::Column::Year.eq(year)),
365                )
366                .one(&*tx)
367                .await?;
368
369            let monthly_usage = match monthly_usage {
370                Some(usage) => {
371                    monthly_usage::Entity::update(monthly_usage::ActiveModel {
372                        id: ActiveValue::unchanged(usage.id),
373                        input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
374                        cache_creation_input_tokens: ActiveValue::set(
375                            usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
376                        ),
377                        cache_read_input_tokens: ActiveValue::set(
378                            usage.cache_read_input_tokens + tokens.input_cache_read as i64,
379                        ),
380                        output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
381                        ..Default::default()
382                    })
383                    .exec(&*tx)
384                    .await?
385                }
386                None => {
387                    monthly_usage::ActiveModel {
388                        user_id: ActiveValue::set(user_id),
389                        model_id: ActiveValue::set(model.id),
390                        month: ActiveValue::set(month),
391                        year: ActiveValue::set(year),
392                        input_tokens: ActiveValue::set(tokens.input as i64),
393                        cache_creation_input_tokens: ActiveValue::set(
394                            tokens.input_cache_creation as i64,
395                        ),
396                        cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
397                        output_tokens: ActiveValue::set(tokens.output as i64),
398                        ..Default::default()
399                    }
400                    .insert(&*tx)
401                    .await?
402                }
403            };
404
405            let spending_this_month = calculate_spending(
406                model,
407                monthly_usage.input_tokens as usize,
408                monthly_usage.cache_creation_input_tokens as usize,
409                monthly_usage.cache_read_input_tokens as usize,
410                monthly_usage.output_tokens as usize,
411            );
412
413            if !is_staff
414                && spending_this_month > free_tier_monthly_spending_limit
415                && has_llm_subscription
416                && (spending_this_month - free_tier_monthly_spending_limit) <= max_monthly_spend
417            {
418                billing_event::ActiveModel {
419                    id: ActiveValue::not_set(),
420                    idempotency_key: ActiveValue::not_set(),
421                    user_id: ActiveValue::set(user_id),
422                    model_id: ActiveValue::set(model.id),
423                    input_tokens: ActiveValue::set(tokens.input as i64),
424                    input_cache_creation_tokens: ActiveValue::set(
425                        tokens.input_cache_creation as i64,
426                    ),
427                    input_cache_read_tokens: ActiveValue::set(tokens.input_cache_read as i64),
428                    output_tokens: ActiveValue::set(tokens.output as i64),
429                }
430                .insert(&*tx)
431                .await?;
432            }
433
434            // Update lifetime usage
435            let lifetime_usage = lifetime_usage::Entity::find()
436                .filter(
437                    lifetime_usage::Column::UserId
438                        .eq(user_id)
439                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
440                )
441                .one(&*tx)
442                .await?;
443
444            let lifetime_usage = match lifetime_usage {
445                Some(usage) => {
446                    lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
447                        id: ActiveValue::unchanged(usage.id),
448                        input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
449                        cache_creation_input_tokens: ActiveValue::set(
450                            usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
451                        ),
452                        cache_read_input_tokens: ActiveValue::set(
453                            usage.cache_read_input_tokens + tokens.input_cache_read as i64,
454                        ),
455                        output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
456                        ..Default::default()
457                    })
458                    .exec(&*tx)
459                    .await?
460                }
461                None => {
462                    lifetime_usage::ActiveModel {
463                        user_id: ActiveValue::set(user_id),
464                        model_id: ActiveValue::set(model.id),
465                        input_tokens: ActiveValue::set(tokens.input as i64),
466                        cache_creation_input_tokens: ActiveValue::set(
467                            tokens.input_cache_creation as i64,
468                        ),
469                        cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
470                        output_tokens: ActiveValue::set(tokens.output as i64),
471                        ..Default::default()
472                    }
473                    .insert(&*tx)
474                    .await?
475                }
476            };
477
478            let lifetime_spending = calculate_spending(
479                model,
480                lifetime_usage.input_tokens as usize,
481                lifetime_usage.cache_creation_input_tokens as usize,
482                lifetime_usage.cache_read_input_tokens as usize,
483                lifetime_usage.output_tokens as usize,
484            );
485
486            Ok(Usage {
487                requests_this_minute,
488                tokens_this_minute,
489                tokens_this_day,
490                tokens_this_month: TokenUsage {
491                    input: monthly_usage.input_tokens as usize,
492                    input_cache_creation: monthly_usage.cache_creation_input_tokens as usize,
493                    input_cache_read: monthly_usage.cache_read_input_tokens as usize,
494                    output: monthly_usage.output_tokens as usize,
495                },
496                spending_this_month,
497                lifetime_spending,
498            })
499        })
500        .await
501    }
502
503    /// Returns the active user count for the specified model.
504    pub async fn get_active_user_count(
505        &self,
506        provider: LanguageModelProvider,
507        model_name: &str,
508        now: DateTimeUtc,
509    ) -> Result<ActiveUserCount> {
510        self.transaction(|tx| async move {
511            let minute_since = now - Duration::minutes(5);
512            let day_since = now - Duration::days(5);
513
514            let model = self
515                .models
516                .get(&(provider, model_name.to_string()))
517                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
518
519            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
520
521            let users_in_recent_minutes = usage::Entity::find()
522                .filter(
523                    usage::Column::ModelId
524                        .eq(model.id)
525                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
526                        .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
527                        .and(usage::Column::IsStaff.eq(false)),
528                )
529                .select_only()
530                .column(usage::Column::UserId)
531                .group_by(usage::Column::UserId)
532                .count(&*tx)
533                .await? as usize;
534
535            let users_in_recent_days = usage::Entity::find()
536                .filter(
537                    usage::Column::ModelId
538                        .eq(model.id)
539                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
540                        .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
541                        .and(usage::Column::IsStaff.eq(false)),
542                )
543                .select_only()
544                .column(usage::Column::UserId)
545                .group_by(usage::Column::UserId)
546                .count(&*tx)
547                .await? as usize;
548
549            Ok(ActiveUserCount {
550                users_in_recent_minutes,
551                users_in_recent_days,
552            })
553        })
554        .await
555    }
556
557    #[allow(clippy::too_many_arguments)]
558    async fn update_usage_for_measure(
559        &self,
560        user_id: UserId,
561        is_staff: bool,
562        model_id: ModelId,
563        usages: &[usage::Model],
564        usage_measure: UsageMeasure,
565        now: DateTimeUtc,
566        usage_to_add: usize,
567        tx: &DatabaseTransaction,
568    ) -> Result<usize> {
569        let now = now.naive_utc();
570        let measure_id = *self
571            .usage_measure_ids
572            .get(&usage_measure)
573            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
574
575        let mut id = None;
576        let mut timestamp = now;
577        let mut buckets = vec![0_i64];
578
579        if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
580            id = Some(old_usage.id);
581            let (live_buckets, buckets_since) =
582                Self::get_live_buckets(old_usage, now, usage_measure);
583            if !live_buckets.is_empty() {
584                buckets.clear();
585                buckets.extend_from_slice(live_buckets);
586                buckets.extend(iter::repeat(0).take(buckets_since));
587                timestamp =
588                    old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
589            }
590        }
591
592        *buckets.last_mut().unwrap() += usage_to_add as i64;
593        let total_usage = buckets.iter().sum::<i64>() as usize;
594
595        let mut model = usage::ActiveModel {
596            user_id: ActiveValue::set(user_id),
597            is_staff: ActiveValue::set(is_staff),
598            model_id: ActiveValue::set(model_id),
599            measure_id: ActiveValue::set(measure_id),
600            timestamp: ActiveValue::set(timestamp),
601            buckets: ActiveValue::set(buckets),
602            ..Default::default()
603        };
604
605        if let Some(id) = id {
606            model.id = ActiveValue::unchanged(id);
607            model.update(tx).await?;
608        } else {
609            usage::Entity::insert(model)
610                .exec_without_returning(tx)
611                .await?;
612        }
613
614        Ok(total_usage)
615    }
616
617    fn get_usage_for_measure(
618        &self,
619        usages: &[usage::Model],
620        now: DateTimeUtc,
621        usage_measure: UsageMeasure,
622    ) -> Result<usize> {
623        let now = now.naive_utc();
624        let measure_id = *self
625            .usage_measure_ids
626            .get(&usage_measure)
627            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
628        let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
629            return Ok(0);
630        };
631
632        let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
633        Ok(live_buckets.iter().sum::<i64>() as _)
634    }
635
636    fn get_live_buckets(
637        usage: &usage::Model,
638        now: chrono::NaiveDateTime,
639        measure: UsageMeasure,
640    ) -> (&[i64], usize) {
641        let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
642        let buckets_since_usage =
643            seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
644        let buckets_since_usage = buckets_since_usage.ceil() as usize;
645        let mut live_buckets = &[] as &[i64];
646        if buckets_since_usage < measure.bucket_count() {
647            let expired_bucket_count =
648                (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
649            live_buckets = &usage.buckets[expired_bucket_count..];
650            while live_buckets.first() == Some(&0) {
651                live_buckets = &live_buckets[1..];
652            }
653        }
654        (live_buckets, buckets_since_usage)
655    }
656}
657
658fn calculate_spending(
659    model: &model::Model,
660    input_tokens_this_month: usize,
661    cache_creation_input_tokens_this_month: usize,
662    cache_read_input_tokens_this_month: usize,
663    output_tokens_this_month: usize,
664) -> Cents {
665    let input_token_cost =
666        input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
667    let cache_creation_input_token_cost = cache_creation_input_tokens_this_month
668        * model.price_per_million_cache_creation_input_tokens as usize
669        / 1_000_000;
670    let cache_read_input_token_cost = cache_read_input_tokens_this_month
671        * model.price_per_million_cache_read_input_tokens as usize
672        / 1_000_000;
673    let output_token_cost =
674        output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
675    let spending = input_token_cost
676        + cache_creation_input_token_cost
677        + cache_read_input_token_cost
678        + output_token_cost;
679    Cents::new(spending as u32)
680}
681
682const MINUTE_BUCKET_COUNT: usize = 12;
683const DAY_BUCKET_COUNT: usize = 48;
684
685impl UsageMeasure {
686    fn bucket_count(&self) -> usize {
687        match self {
688            UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
689            UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
690            UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
691        }
692    }
693
694    fn total_duration(&self) -> Duration {
695        match self {
696            UsageMeasure::RequestsPerMinute => Duration::minutes(1),
697            UsageMeasure::TokensPerMinute => Duration::minutes(1),
698            UsageMeasure::TokensPerDay => Duration::hours(24),
699        }
700    }
701
702    fn bucket_duration(&self) -> Duration {
703        self.total_duration() / self.bucket_count() as i32
704    }
705}