usages.rs

  1use crate::db::UserId;
  2use chrono::{Datelike, Duration};
  3use futures::StreamExt as _;
  4use rpc::LanguageModelProvider;
  5use sea_orm::QuerySelect;
  6use std::{iter, str::FromStr};
  7use strum::IntoEnumIterator as _;
  8
  9use super::*;
 10
 11#[derive(Debug, PartialEq, Clone, Copy)]
 12pub struct Usage {
 13    pub requests_this_minute: usize,
 14    pub tokens_this_minute: usize,
 15    pub tokens_this_day: usize,
 16    pub input_tokens_this_month: usize,
 17    pub cache_creation_input_tokens_this_month: usize,
 18    pub cache_read_input_tokens_this_month: usize,
 19    pub output_tokens_this_month: usize,
 20    pub spending_this_month: usize,
 21    pub lifetime_spending: usize,
 22}
 23
 24#[derive(Debug, PartialEq, Clone)]
 25pub struct ApplicationWideUsage {
 26    pub provider: LanguageModelProvider,
 27    pub model: String,
 28    pub requests_this_minute: usize,
 29    pub tokens_this_minute: usize,
 30}
 31
 32#[derive(Clone, Copy, Debug, Default)]
 33pub struct ActiveUserCount {
 34    pub users_in_recent_minutes: usize,
 35    pub users_in_recent_days: usize,
 36}
 37
 38impl LlmDatabase {
 39    pub async fn initialize_usage_measures(&mut self) -> Result<()> {
 40        let all_measures = self
 41            .transaction(|tx| async move {
 42                let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
 43
 44                let new_measures = UsageMeasure::iter()
 45                    .filter(|measure| {
 46                        !existing_measures
 47                            .iter()
 48                            .any(|m| m.name == measure.to_string())
 49                    })
 50                    .map(|measure| usage_measure::ActiveModel {
 51                        name: ActiveValue::set(measure.to_string()),
 52                        ..Default::default()
 53                    })
 54                    .collect::<Vec<_>>();
 55
 56                if !new_measures.is_empty() {
 57                    usage_measure::Entity::insert_many(new_measures)
 58                        .exec(&*tx)
 59                        .await?;
 60                }
 61
 62                Ok(usage_measure::Entity::find().all(&*tx).await?)
 63            })
 64            .await?;
 65
 66        self.usage_measure_ids = all_measures
 67            .into_iter()
 68            .filter_map(|measure| {
 69                UsageMeasure::from_str(&measure.name)
 70                    .ok()
 71                    .map(|um| (um, measure.id))
 72            })
 73            .collect();
 74        Ok(())
 75    }
 76
 77    pub async fn get_application_wide_usages_by_model(
 78        &self,
 79        now: DateTimeUtc,
 80    ) -> Result<Vec<ApplicationWideUsage>> {
 81        self.transaction(|tx| async move {
 82            let past_minute = now - Duration::minutes(1);
 83            let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
 84            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
 85
 86            let mut results = Vec::new();
 87            for ((provider, model_name), model) in self.models.iter() {
 88                let mut usages = usage::Entity::find()
 89                    .filter(
 90                        usage::Column::Timestamp
 91                            .gte(past_minute.naive_utc())
 92                            .and(usage::Column::IsStaff.eq(false))
 93                            .and(usage::Column::ModelId.eq(model.id))
 94                            .and(
 95                                usage::Column::MeasureId
 96                                    .eq(requests_per_minute)
 97                                    .or(usage::Column::MeasureId.eq(tokens_per_minute)),
 98                            ),
 99                    )
100                    .stream(&*tx)
101                    .await?;
102
103                let mut requests_this_minute = 0;
104                let mut tokens_this_minute = 0;
105                while let Some(usage) = usages.next().await {
106                    let usage = usage?;
107                    if usage.measure_id == requests_per_minute {
108                        requests_this_minute += Self::get_live_buckets(
109                            &usage,
110                            now.naive_utc(),
111                            UsageMeasure::RequestsPerMinute,
112                        )
113                        .0
114                        .iter()
115                        .copied()
116                        .sum::<i64>() as usize;
117                    } else if usage.measure_id == tokens_per_minute {
118                        tokens_this_minute += Self::get_live_buckets(
119                            &usage,
120                            now.naive_utc(),
121                            UsageMeasure::TokensPerMinute,
122                        )
123                        .0
124                        .iter()
125                        .copied()
126                        .sum::<i64>() as usize;
127                    }
128                }
129
130                results.push(ApplicationWideUsage {
131                    provider: *provider,
132                    model: model_name.clone(),
133                    requests_this_minute,
134                    tokens_this_minute,
135                })
136            }
137
138            Ok(results)
139        })
140        .await
141    }
142
143    pub async fn get_user_spending_for_month(
144        &self,
145        user_id: UserId,
146        now: DateTimeUtc,
147    ) -> Result<usize> {
148        self.transaction(|tx| async move {
149            let month = now.date_naive().month() as i32;
150            let year = now.date_naive().year();
151
152            let mut monthly_usages = monthly_usage::Entity::find()
153                .filter(
154                    monthly_usage::Column::UserId
155                        .eq(user_id)
156                        .and(monthly_usage::Column::Month.eq(month))
157                        .and(monthly_usage::Column::Year.eq(year)),
158                )
159                .stream(&*tx)
160                .await?;
161            let mut monthly_spending_in_cents = 0;
162
163            while let Some(usage) = monthly_usages.next().await {
164                let usage = usage?;
165                let Ok(model) = self.model_by_id(usage.model_id) else {
166                    continue;
167                };
168
169                monthly_spending_in_cents += calculate_spending(
170                    model,
171                    usage.input_tokens as usize,
172                    usage.cache_creation_input_tokens as usize,
173                    usage.cache_read_input_tokens as usize,
174                    usage.output_tokens as usize,
175                );
176            }
177
178            Ok(monthly_spending_in_cents)
179        })
180        .await
181    }
182
183    pub async fn get_usage(
184        &self,
185        user_id: UserId,
186        provider: LanguageModelProvider,
187        model_name: &str,
188        now: DateTimeUtc,
189    ) -> Result<Usage> {
190        self.transaction(|tx| async move {
191            let model = self
192                .models
193                .get(&(provider, model_name.to_string()))
194                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
195
196            let usages = usage::Entity::find()
197                .filter(
198                    usage::Column::UserId
199                        .eq(user_id)
200                        .and(usage::Column::ModelId.eq(model.id)),
201                )
202                .all(&*tx)
203                .await?;
204
205            let month = now.date_naive().month() as i32;
206            let year = now.date_naive().year();
207            let monthly_usage = monthly_usage::Entity::find()
208                .filter(
209                    monthly_usage::Column::UserId
210                        .eq(user_id)
211                        .and(monthly_usage::Column::ModelId.eq(model.id))
212                        .and(monthly_usage::Column::Month.eq(month))
213                        .and(monthly_usage::Column::Year.eq(year)),
214                )
215                .one(&*tx)
216                .await?;
217            let lifetime_usage = lifetime_usage::Entity::find()
218                .filter(
219                    lifetime_usage::Column::UserId
220                        .eq(user_id)
221                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
222                )
223                .one(&*tx)
224                .await?;
225
226            let requests_this_minute =
227                self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
228            let tokens_this_minute =
229                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
230            let tokens_this_day =
231                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
232            let spending_this_month = if let Some(monthly_usage) = &monthly_usage {
233                calculate_spending(
234                    model,
235                    monthly_usage.input_tokens as usize,
236                    monthly_usage.cache_creation_input_tokens as usize,
237                    monthly_usage.cache_read_input_tokens as usize,
238                    monthly_usage.output_tokens as usize,
239                )
240            } else {
241                0
242            };
243            let lifetime_spending = if let Some(lifetime_usage) = &lifetime_usage {
244                calculate_spending(
245                    model,
246                    lifetime_usage.input_tokens as usize,
247                    lifetime_usage.cache_creation_input_tokens as usize,
248                    lifetime_usage.cache_read_input_tokens as usize,
249                    lifetime_usage.output_tokens as usize,
250                )
251            } else {
252                0
253            };
254
255            Ok(Usage {
256                requests_this_minute,
257                tokens_this_minute,
258                tokens_this_day,
259                input_tokens_this_month: monthly_usage
260                    .as_ref()
261                    .map_or(0, |usage| usage.input_tokens as usize),
262                cache_creation_input_tokens_this_month: monthly_usage
263                    .as_ref()
264                    .map_or(0, |usage| usage.cache_creation_input_tokens as usize),
265                cache_read_input_tokens_this_month: monthly_usage
266                    .as_ref()
267                    .map_or(0, |usage| usage.cache_read_input_tokens as usize),
268                output_tokens_this_month: monthly_usage
269                    .as_ref()
270                    .map_or(0, |usage| usage.output_tokens as usize),
271                spending_this_month,
272                lifetime_spending,
273            })
274        })
275        .await
276    }
277
278    #[allow(clippy::too_many_arguments)]
279    pub async fn record_usage(
280        &self,
281        user_id: UserId,
282        is_staff: bool,
283        provider: LanguageModelProvider,
284        model_name: &str,
285        input_token_count: usize,
286        cache_creation_input_tokens: usize,
287        cache_read_input_tokens: usize,
288        output_token_count: usize,
289        now: DateTimeUtc,
290    ) -> Result<Usage> {
291        self.transaction(|tx| async move {
292            let model = self.model(provider, model_name)?;
293
294            let usages = usage::Entity::find()
295                .filter(
296                    usage::Column::UserId
297                        .eq(user_id)
298                        .and(usage::Column::ModelId.eq(model.id)),
299                )
300                .all(&*tx)
301                .await?;
302
303            let requests_this_minute = self
304                .update_usage_for_measure(
305                    user_id,
306                    is_staff,
307                    model.id,
308                    &usages,
309                    UsageMeasure::RequestsPerMinute,
310                    now,
311                    1,
312                    &tx,
313                )
314                .await?;
315            let total_token_count = input_token_count
316                + cache_read_input_tokens
317                + cache_creation_input_tokens
318                + output_token_count;
319            let tokens_this_minute = self
320                .update_usage_for_measure(
321                    user_id,
322                    is_staff,
323                    model.id,
324                    &usages,
325                    UsageMeasure::TokensPerMinute,
326                    now,
327                    total_token_count,
328                    &tx,
329                )
330                .await?;
331            let tokens_this_day = self
332                .update_usage_for_measure(
333                    user_id,
334                    is_staff,
335                    model.id,
336                    &usages,
337                    UsageMeasure::TokensPerDay,
338                    now,
339                    total_token_count,
340                    &tx,
341                )
342                .await?;
343
344            let month = now.date_naive().month() as i32;
345            let year = now.date_naive().year();
346
347            // Update monthly usage
348            let monthly_usage = monthly_usage::Entity::find()
349                .filter(
350                    monthly_usage::Column::UserId
351                        .eq(user_id)
352                        .and(monthly_usage::Column::ModelId.eq(model.id))
353                        .and(monthly_usage::Column::Month.eq(month))
354                        .and(monthly_usage::Column::Year.eq(year)),
355                )
356                .one(&*tx)
357                .await?;
358
359            let monthly_usage = match monthly_usage {
360                Some(usage) => {
361                    monthly_usage::Entity::update(monthly_usage::ActiveModel {
362                        id: ActiveValue::unchanged(usage.id),
363                        input_tokens: ActiveValue::set(
364                            usage.input_tokens + input_token_count as i64,
365                        ),
366                        cache_creation_input_tokens: ActiveValue::set(
367                            usage.cache_creation_input_tokens + cache_creation_input_tokens as i64,
368                        ),
369                        cache_read_input_tokens: ActiveValue::set(
370                            usage.cache_read_input_tokens + cache_read_input_tokens as i64,
371                        ),
372                        output_tokens: ActiveValue::set(
373                            usage.output_tokens + output_token_count as i64,
374                        ),
375                        ..Default::default()
376                    })
377                    .exec(&*tx)
378                    .await?
379                }
380                None => {
381                    monthly_usage::ActiveModel {
382                        user_id: ActiveValue::set(user_id),
383                        model_id: ActiveValue::set(model.id),
384                        month: ActiveValue::set(month),
385                        year: ActiveValue::set(year),
386                        input_tokens: ActiveValue::set(input_token_count as i64),
387                        cache_creation_input_tokens: ActiveValue::set(
388                            cache_creation_input_tokens as i64,
389                        ),
390                        cache_read_input_tokens: ActiveValue::set(cache_read_input_tokens as i64),
391                        output_tokens: ActiveValue::set(output_token_count as i64),
392                        ..Default::default()
393                    }
394                    .insert(&*tx)
395                    .await?
396                }
397            };
398
399            let spending_this_month = calculate_spending(
400                model,
401                monthly_usage.input_tokens as usize,
402                monthly_usage.cache_creation_input_tokens as usize,
403                monthly_usage.cache_read_input_tokens as usize,
404                monthly_usage.output_tokens as usize,
405            );
406
407            // Update lifetime usage
408            let lifetime_usage = lifetime_usage::Entity::find()
409                .filter(
410                    lifetime_usage::Column::UserId
411                        .eq(user_id)
412                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
413                )
414                .one(&*tx)
415                .await?;
416
417            let lifetime_usage = match lifetime_usage {
418                Some(usage) => {
419                    lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
420                        id: ActiveValue::unchanged(usage.id),
421                        input_tokens: ActiveValue::set(
422                            usage.input_tokens + input_token_count as i64,
423                        ),
424                        cache_creation_input_tokens: ActiveValue::set(
425                            usage.cache_creation_input_tokens + cache_creation_input_tokens as i64,
426                        ),
427                        cache_read_input_tokens: ActiveValue::set(
428                            usage.cache_read_input_tokens + cache_read_input_tokens as i64,
429                        ),
430                        output_tokens: ActiveValue::set(
431                            usage.output_tokens + output_token_count as i64,
432                        ),
433                        ..Default::default()
434                    })
435                    .exec(&*tx)
436                    .await?
437                }
438                None => {
439                    lifetime_usage::ActiveModel {
440                        user_id: ActiveValue::set(user_id),
441                        model_id: ActiveValue::set(model.id),
442                        input_tokens: ActiveValue::set(input_token_count as i64),
443                        cache_creation_input_tokens: ActiveValue::set(
444                            cache_creation_input_tokens as i64,
445                        ),
446                        cache_read_input_tokens: ActiveValue::set(cache_read_input_tokens as i64),
447                        output_tokens: ActiveValue::set(output_token_count as i64),
448                        ..Default::default()
449                    }
450                    .insert(&*tx)
451                    .await?
452                }
453            };
454
455            let lifetime_spending = calculate_spending(
456                model,
457                lifetime_usage.input_tokens as usize,
458                lifetime_usage.cache_creation_input_tokens as usize,
459                lifetime_usage.cache_read_input_tokens as usize,
460                lifetime_usage.output_tokens as usize,
461            );
462
463            Ok(Usage {
464                requests_this_minute,
465                tokens_this_minute,
466                tokens_this_day,
467                input_tokens_this_month: monthly_usage.input_tokens as usize,
468                cache_creation_input_tokens_this_month: monthly_usage.cache_creation_input_tokens
469                    as usize,
470                cache_read_input_tokens_this_month: monthly_usage.cache_read_input_tokens as usize,
471                output_tokens_this_month: monthly_usage.output_tokens as usize,
472                spending_this_month,
473                lifetime_spending,
474            })
475        })
476        .await
477    }
478
479    /// Returns the active user count for the specified model.
480    pub async fn get_active_user_count(
481        &self,
482        provider: LanguageModelProvider,
483        model_name: &str,
484        now: DateTimeUtc,
485    ) -> Result<ActiveUserCount> {
486        self.transaction(|tx| async move {
487            let minute_since = now - Duration::minutes(5);
488            let day_since = now - Duration::days(5);
489
490            let model = self
491                .models
492                .get(&(provider, model_name.to_string()))
493                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
494
495            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
496
497            let users_in_recent_minutes = usage::Entity::find()
498                .filter(
499                    usage::Column::ModelId
500                        .eq(model.id)
501                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
502                        .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
503                        .and(usage::Column::IsStaff.eq(false)),
504                )
505                .select_only()
506                .column(usage::Column::UserId)
507                .group_by(usage::Column::UserId)
508                .count(&*tx)
509                .await? as usize;
510
511            let users_in_recent_days = usage::Entity::find()
512                .filter(
513                    usage::Column::ModelId
514                        .eq(model.id)
515                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
516                        .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
517                        .and(usage::Column::IsStaff.eq(false)),
518                )
519                .select_only()
520                .column(usage::Column::UserId)
521                .group_by(usage::Column::UserId)
522                .count(&*tx)
523                .await? as usize;
524
525            Ok(ActiveUserCount {
526                users_in_recent_minutes,
527                users_in_recent_days,
528            })
529        })
530        .await
531    }
532
533    #[allow(clippy::too_many_arguments)]
534    async fn update_usage_for_measure(
535        &self,
536        user_id: UserId,
537        is_staff: bool,
538        model_id: ModelId,
539        usages: &[usage::Model],
540        usage_measure: UsageMeasure,
541        now: DateTimeUtc,
542        usage_to_add: usize,
543        tx: &DatabaseTransaction,
544    ) -> Result<usize> {
545        let now = now.naive_utc();
546        let measure_id = *self
547            .usage_measure_ids
548            .get(&usage_measure)
549            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
550
551        let mut id = None;
552        let mut timestamp = now;
553        let mut buckets = vec![0_i64];
554
555        if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
556            id = Some(old_usage.id);
557            let (live_buckets, buckets_since) =
558                Self::get_live_buckets(old_usage, now, usage_measure);
559            if !live_buckets.is_empty() {
560                buckets.clear();
561                buckets.extend_from_slice(live_buckets);
562                buckets.extend(iter::repeat(0).take(buckets_since));
563                timestamp =
564                    old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
565            }
566        }
567
568        *buckets.last_mut().unwrap() += usage_to_add as i64;
569        let total_usage = buckets.iter().sum::<i64>() as usize;
570
571        let mut model = usage::ActiveModel {
572            user_id: ActiveValue::set(user_id),
573            is_staff: ActiveValue::set(is_staff),
574            model_id: ActiveValue::set(model_id),
575            measure_id: ActiveValue::set(measure_id),
576            timestamp: ActiveValue::set(timestamp),
577            buckets: ActiveValue::set(buckets),
578            ..Default::default()
579        };
580
581        if let Some(id) = id {
582            model.id = ActiveValue::unchanged(id);
583            model.update(tx).await?;
584        } else {
585            usage::Entity::insert(model)
586                .exec_without_returning(tx)
587                .await?;
588        }
589
590        Ok(total_usage)
591    }
592
593    fn get_usage_for_measure(
594        &self,
595        usages: &[usage::Model],
596        now: DateTimeUtc,
597        usage_measure: UsageMeasure,
598    ) -> Result<usize> {
599        let now = now.naive_utc();
600        let measure_id = *self
601            .usage_measure_ids
602            .get(&usage_measure)
603            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
604        let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
605            return Ok(0);
606        };
607
608        let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
609        Ok(live_buckets.iter().sum::<i64>() as _)
610    }
611
612    fn get_live_buckets(
613        usage: &usage::Model,
614        now: chrono::NaiveDateTime,
615        measure: UsageMeasure,
616    ) -> (&[i64], usize) {
617        let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
618        let buckets_since_usage =
619            seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
620        let buckets_since_usage = buckets_since_usage.ceil() as usize;
621        let mut live_buckets = &[] as &[i64];
622        if buckets_since_usage < measure.bucket_count() {
623            let expired_bucket_count =
624                (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
625            live_buckets = &usage.buckets[expired_bucket_count..];
626            while live_buckets.first() == Some(&0) {
627                live_buckets = &live_buckets[1..];
628            }
629        }
630        (live_buckets, buckets_since_usage)
631    }
632}
633
634fn calculate_spending(
635    model: &model::Model,
636    input_tokens_this_month: usize,
637    cache_creation_input_tokens_this_month: usize,
638    cache_read_input_tokens_this_month: usize,
639    output_tokens_this_month: usize,
640) -> usize {
641    let input_token_cost =
642        input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
643    let cache_creation_input_token_cost = cache_creation_input_tokens_this_month
644        * model.price_per_million_cache_creation_input_tokens as usize
645        / 1_000_000;
646    let cache_read_input_token_cost = cache_read_input_tokens_this_month
647        * model.price_per_million_cache_read_input_tokens as usize
648        / 1_000_000;
649    let output_token_cost =
650        output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
651    input_token_cost
652        + cache_creation_input_token_cost
653        + cache_read_input_token_cost
654        + output_token_cost
655}
656
657const MINUTE_BUCKET_COUNT: usize = 12;
658const DAY_BUCKET_COUNT: usize = 48;
659
660impl UsageMeasure {
661    fn bucket_count(&self) -> usize {
662        match self {
663            UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
664            UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
665            UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
666        }
667    }
668
669    fn total_duration(&self) -> Duration {
670        match self {
671            UsageMeasure::RequestsPerMinute => Duration::minutes(1),
672            UsageMeasure::TokensPerMinute => Duration::minutes(1),
673            UsageMeasure::TokensPerDay => Duration::hours(24),
674        }
675    }
676
677    fn bucket_duration(&self) -> Duration {
678        self.total_duration() / self.bucket_count() as i32
679    }
680}