1use crate::db::UserId;
2use crate::llm::Cents;
3use chrono::{Datelike, Duration};
4use futures::StreamExt as _;
5use rpc::LanguageModelProvider;
6use sea_orm::QuerySelect;
7use std::{iter, str::FromStr};
8use strum::IntoEnumIterator as _;
9
10use super::*;
11
12#[derive(Debug, PartialEq, Clone, Copy, Default, serde::Serialize)]
13pub struct TokenUsage {
14 pub input: usize,
15 pub input_cache_creation: usize,
16 pub input_cache_read: usize,
17 pub output: usize,
18}
19
20impl TokenUsage {
21 pub fn total(&self) -> usize {
22 self.input + self.input_cache_creation + self.input_cache_read + self.output
23 }
24}
25
26#[derive(Debug, PartialEq, Clone, Copy, serde::Serialize)]
27pub struct Usage {
28 pub requests_this_minute: usize,
29 pub tokens_this_minute: usize,
30 pub tokens_this_day: usize,
31 pub tokens_this_month: TokenUsage,
32 pub spending_this_month: Cents,
33 pub lifetime_spending: Cents,
34}
35
36#[derive(Debug, PartialEq, Clone)]
37pub struct ApplicationWideUsage {
38 pub provider: LanguageModelProvider,
39 pub model: String,
40 pub requests_this_minute: usize,
41 pub tokens_this_minute: usize,
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct ActiveUserCount {
46 pub users_in_recent_minutes: usize,
47 pub users_in_recent_days: usize,
48}
49
50impl LlmDatabase {
51 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
52 let all_measures = self
53 .transaction(|tx| async move {
54 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
55
56 let new_measures = UsageMeasure::iter()
57 .filter(|measure| {
58 !existing_measures
59 .iter()
60 .any(|m| m.name == measure.to_string())
61 })
62 .map(|measure| usage_measure::ActiveModel {
63 name: ActiveValue::set(measure.to_string()),
64 ..Default::default()
65 })
66 .collect::<Vec<_>>();
67
68 if !new_measures.is_empty() {
69 usage_measure::Entity::insert_many(new_measures)
70 .exec(&*tx)
71 .await?;
72 }
73
74 Ok(usage_measure::Entity::find().all(&*tx).await?)
75 })
76 .await?;
77
78 self.usage_measure_ids = all_measures
79 .into_iter()
80 .filter_map(|measure| {
81 UsageMeasure::from_str(&measure.name)
82 .ok()
83 .map(|um| (um, measure.id))
84 })
85 .collect();
86 Ok(())
87 }
88
89 pub async fn get_application_wide_usages_by_model(
90 &self,
91 now: DateTimeUtc,
92 ) -> Result<Vec<ApplicationWideUsage>> {
93 self.transaction(|tx| async move {
94 let past_minute = now - Duration::minutes(1);
95 let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
96 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
97
98 let mut results = Vec::new();
99 for ((provider, model_name), model) in self.models.iter() {
100 let mut usages = usage::Entity::find()
101 .filter(
102 usage::Column::Timestamp
103 .gte(past_minute.naive_utc())
104 .and(usage::Column::IsStaff.eq(false))
105 .and(usage::Column::ModelId.eq(model.id))
106 .and(
107 usage::Column::MeasureId
108 .eq(requests_per_minute)
109 .or(usage::Column::MeasureId.eq(tokens_per_minute)),
110 ),
111 )
112 .stream(&*tx)
113 .await?;
114
115 let mut requests_this_minute = 0;
116 let mut tokens_this_minute = 0;
117 while let Some(usage) = usages.next().await {
118 let usage = usage?;
119 if usage.measure_id == requests_per_minute {
120 requests_this_minute += Self::get_live_buckets(
121 &usage,
122 now.naive_utc(),
123 UsageMeasure::RequestsPerMinute,
124 )
125 .0
126 .iter()
127 .copied()
128 .sum::<i64>() as usize;
129 } else if usage.measure_id == tokens_per_minute {
130 tokens_this_minute += Self::get_live_buckets(
131 &usage,
132 now.naive_utc(),
133 UsageMeasure::TokensPerMinute,
134 )
135 .0
136 .iter()
137 .copied()
138 .sum::<i64>() as usize;
139 }
140 }
141
142 results.push(ApplicationWideUsage {
143 provider: *provider,
144 model: model_name.clone(),
145 requests_this_minute,
146 tokens_this_minute,
147 })
148 }
149
150 Ok(results)
151 })
152 .await
153 }
154
155 pub async fn get_user_spending_for_month(
156 &self,
157 user_id: UserId,
158 now: DateTimeUtc,
159 ) -> Result<Cents> {
160 self.transaction(|tx| async move {
161 let month = now.date_naive().month() as i32;
162 let year = now.date_naive().year();
163
164 let mut monthly_usages = monthly_usage::Entity::find()
165 .filter(
166 monthly_usage::Column::UserId
167 .eq(user_id)
168 .and(monthly_usage::Column::Month.eq(month))
169 .and(monthly_usage::Column::Year.eq(year)),
170 )
171 .stream(&*tx)
172 .await?;
173 let mut monthly_spending = Cents::ZERO;
174
175 while let Some(usage) = monthly_usages.next().await {
176 let usage = usage?;
177 let Ok(model) = self.model_by_id(usage.model_id) else {
178 continue;
179 };
180
181 monthly_spending += calculate_spending(
182 model,
183 usage.input_tokens as usize,
184 usage.cache_creation_input_tokens as usize,
185 usage.cache_read_input_tokens as usize,
186 usage.output_tokens as usize,
187 );
188 }
189
190 Ok(monthly_spending)
191 })
192 .await
193 }
194
195 pub async fn get_usage(
196 &self,
197 user_id: UserId,
198 provider: LanguageModelProvider,
199 model_name: &str,
200 now: DateTimeUtc,
201 ) -> Result<Usage> {
202 self.transaction(|tx| async move {
203 let model = self
204 .models
205 .get(&(provider, model_name.to_string()))
206 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
207
208 let usages = usage::Entity::find()
209 .filter(
210 usage::Column::UserId
211 .eq(user_id)
212 .and(usage::Column::ModelId.eq(model.id)),
213 )
214 .all(&*tx)
215 .await?;
216
217 let month = now.date_naive().month() as i32;
218 let year = now.date_naive().year();
219 let monthly_usage = monthly_usage::Entity::find()
220 .filter(
221 monthly_usage::Column::UserId
222 .eq(user_id)
223 .and(monthly_usage::Column::ModelId.eq(model.id))
224 .and(monthly_usage::Column::Month.eq(month))
225 .and(monthly_usage::Column::Year.eq(year)),
226 )
227 .one(&*tx)
228 .await?;
229 let lifetime_usage = lifetime_usage::Entity::find()
230 .filter(
231 lifetime_usage::Column::UserId
232 .eq(user_id)
233 .and(lifetime_usage::Column::ModelId.eq(model.id)),
234 )
235 .one(&*tx)
236 .await?;
237
238 let requests_this_minute =
239 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
240 let tokens_this_minute =
241 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
242 let tokens_this_day =
243 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
244 let spending_this_month = if let Some(monthly_usage) = &monthly_usage {
245 calculate_spending(
246 model,
247 monthly_usage.input_tokens as usize,
248 monthly_usage.cache_creation_input_tokens as usize,
249 monthly_usage.cache_read_input_tokens as usize,
250 monthly_usage.output_tokens as usize,
251 )
252 } else {
253 Cents::ZERO
254 };
255 let lifetime_spending = if let Some(lifetime_usage) = &lifetime_usage {
256 calculate_spending(
257 model,
258 lifetime_usage.input_tokens as usize,
259 lifetime_usage.cache_creation_input_tokens as usize,
260 lifetime_usage.cache_read_input_tokens as usize,
261 lifetime_usage.output_tokens as usize,
262 )
263 } else {
264 Cents::ZERO
265 };
266
267 Ok(Usage {
268 requests_this_minute,
269 tokens_this_minute,
270 tokens_this_day,
271 tokens_this_month: TokenUsage {
272 input: monthly_usage
273 .as_ref()
274 .map_or(0, |usage| usage.input_tokens as usize),
275 input_cache_creation: monthly_usage
276 .as_ref()
277 .map_or(0, |usage| usage.cache_creation_input_tokens as usize),
278 input_cache_read: monthly_usage
279 .as_ref()
280 .map_or(0, |usage| usage.cache_read_input_tokens as usize),
281 output: monthly_usage
282 .as_ref()
283 .map_or(0, |usage| usage.output_tokens as usize),
284 },
285 spending_this_month,
286 lifetime_spending,
287 })
288 })
289 .await
290 }
291
292 #[allow(clippy::too_many_arguments)]
293 pub async fn record_usage(
294 &self,
295 user_id: UserId,
296 is_staff: bool,
297 provider: LanguageModelProvider,
298 model_name: &str,
299 tokens: TokenUsage,
300 has_llm_subscription: bool,
301 max_monthly_spend: Cents,
302 free_tier_monthly_spending_limit: Cents,
303 now: DateTimeUtc,
304 ) -> Result<Usage> {
305 self.transaction(|tx| async move {
306 let model = self.model(provider, model_name)?;
307
308 let usages = usage::Entity::find()
309 .filter(
310 usage::Column::UserId
311 .eq(user_id)
312 .and(usage::Column::ModelId.eq(model.id)),
313 )
314 .all(&*tx)
315 .await?;
316
317 let requests_this_minute = self
318 .update_usage_for_measure(
319 user_id,
320 is_staff,
321 model.id,
322 &usages,
323 UsageMeasure::RequestsPerMinute,
324 now,
325 1,
326 &tx,
327 )
328 .await?;
329 let tokens_this_minute = self
330 .update_usage_for_measure(
331 user_id,
332 is_staff,
333 model.id,
334 &usages,
335 UsageMeasure::TokensPerMinute,
336 now,
337 tokens.total(),
338 &tx,
339 )
340 .await?;
341 let tokens_this_day = self
342 .update_usage_for_measure(
343 user_id,
344 is_staff,
345 model.id,
346 &usages,
347 UsageMeasure::TokensPerDay,
348 now,
349 tokens.total(),
350 &tx,
351 )
352 .await?;
353
354 let month = now.date_naive().month() as i32;
355 let year = now.date_naive().year();
356
357 // Update monthly usage
358 let monthly_usage = monthly_usage::Entity::find()
359 .filter(
360 monthly_usage::Column::UserId
361 .eq(user_id)
362 .and(monthly_usage::Column::ModelId.eq(model.id))
363 .and(monthly_usage::Column::Month.eq(month))
364 .and(monthly_usage::Column::Year.eq(year)),
365 )
366 .one(&*tx)
367 .await?;
368
369 let monthly_usage = match monthly_usage {
370 Some(usage) => {
371 monthly_usage::Entity::update(monthly_usage::ActiveModel {
372 id: ActiveValue::unchanged(usage.id),
373 input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
374 cache_creation_input_tokens: ActiveValue::set(
375 usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
376 ),
377 cache_read_input_tokens: ActiveValue::set(
378 usage.cache_read_input_tokens + tokens.input_cache_read as i64,
379 ),
380 output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
381 ..Default::default()
382 })
383 .exec(&*tx)
384 .await?
385 }
386 None => {
387 monthly_usage::ActiveModel {
388 user_id: ActiveValue::set(user_id),
389 model_id: ActiveValue::set(model.id),
390 month: ActiveValue::set(month),
391 year: ActiveValue::set(year),
392 input_tokens: ActiveValue::set(tokens.input as i64),
393 cache_creation_input_tokens: ActiveValue::set(
394 tokens.input_cache_creation as i64,
395 ),
396 cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
397 output_tokens: ActiveValue::set(tokens.output as i64),
398 ..Default::default()
399 }
400 .insert(&*tx)
401 .await?
402 }
403 };
404
405 let spending_this_month = calculate_spending(
406 model,
407 monthly_usage.input_tokens as usize,
408 monthly_usage.cache_creation_input_tokens as usize,
409 monthly_usage.cache_read_input_tokens as usize,
410 monthly_usage.output_tokens as usize,
411 );
412
413 if !is_staff
414 && spending_this_month > free_tier_monthly_spending_limit
415 && has_llm_subscription
416 && (spending_this_month - free_tier_monthly_spending_limit) <= max_monthly_spend
417 {
418 billing_event::ActiveModel {
419 id: ActiveValue::not_set(),
420 idempotency_key: ActiveValue::not_set(),
421 user_id: ActiveValue::set(user_id),
422 model_id: ActiveValue::set(model.id),
423 input_tokens: ActiveValue::set(tokens.input as i64),
424 input_cache_creation_tokens: ActiveValue::set(
425 tokens.input_cache_creation as i64,
426 ),
427 input_cache_read_tokens: ActiveValue::set(tokens.input_cache_read as i64),
428 output_tokens: ActiveValue::set(tokens.output as i64),
429 }
430 .insert(&*tx)
431 .await?;
432 }
433
434 // Update lifetime usage
435 let lifetime_usage = lifetime_usage::Entity::find()
436 .filter(
437 lifetime_usage::Column::UserId
438 .eq(user_id)
439 .and(lifetime_usage::Column::ModelId.eq(model.id)),
440 )
441 .one(&*tx)
442 .await?;
443
444 let lifetime_usage = match lifetime_usage {
445 Some(usage) => {
446 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
447 id: ActiveValue::unchanged(usage.id),
448 input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
449 cache_creation_input_tokens: ActiveValue::set(
450 usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
451 ),
452 cache_read_input_tokens: ActiveValue::set(
453 usage.cache_read_input_tokens + tokens.input_cache_read as i64,
454 ),
455 output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
456 ..Default::default()
457 })
458 .exec(&*tx)
459 .await?
460 }
461 None => {
462 lifetime_usage::ActiveModel {
463 user_id: ActiveValue::set(user_id),
464 model_id: ActiveValue::set(model.id),
465 input_tokens: ActiveValue::set(tokens.input as i64),
466 cache_creation_input_tokens: ActiveValue::set(
467 tokens.input_cache_creation as i64,
468 ),
469 cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
470 output_tokens: ActiveValue::set(tokens.output as i64),
471 ..Default::default()
472 }
473 .insert(&*tx)
474 .await?
475 }
476 };
477
478 let lifetime_spending = calculate_spending(
479 model,
480 lifetime_usage.input_tokens as usize,
481 lifetime_usage.cache_creation_input_tokens as usize,
482 lifetime_usage.cache_read_input_tokens as usize,
483 lifetime_usage.output_tokens as usize,
484 );
485
486 Ok(Usage {
487 requests_this_minute,
488 tokens_this_minute,
489 tokens_this_day,
490 tokens_this_month: TokenUsage {
491 input: monthly_usage.input_tokens as usize,
492 input_cache_creation: monthly_usage.cache_creation_input_tokens as usize,
493 input_cache_read: monthly_usage.cache_read_input_tokens as usize,
494 output: monthly_usage.output_tokens as usize,
495 },
496 spending_this_month,
497 lifetime_spending,
498 })
499 })
500 .await
501 }
502
503 /// Returns the active user count for the specified model.
504 pub async fn get_active_user_count(
505 &self,
506 provider: LanguageModelProvider,
507 model_name: &str,
508 now: DateTimeUtc,
509 ) -> Result<ActiveUserCount> {
510 self.transaction(|tx| async move {
511 let minute_since = now - Duration::minutes(5);
512 let day_since = now - Duration::days(5);
513
514 let model = self
515 .models
516 .get(&(provider, model_name.to_string()))
517 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
518
519 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
520
521 let users_in_recent_minutes = usage::Entity::find()
522 .filter(
523 usage::Column::ModelId
524 .eq(model.id)
525 .and(usage::Column::MeasureId.eq(tokens_per_minute))
526 .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
527 .and(usage::Column::IsStaff.eq(false)),
528 )
529 .select_only()
530 .column(usage::Column::UserId)
531 .group_by(usage::Column::UserId)
532 .count(&*tx)
533 .await? as usize;
534
535 let users_in_recent_days = usage::Entity::find()
536 .filter(
537 usage::Column::ModelId
538 .eq(model.id)
539 .and(usage::Column::MeasureId.eq(tokens_per_minute))
540 .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
541 .and(usage::Column::IsStaff.eq(false)),
542 )
543 .select_only()
544 .column(usage::Column::UserId)
545 .group_by(usage::Column::UserId)
546 .count(&*tx)
547 .await? as usize;
548
549 Ok(ActiveUserCount {
550 users_in_recent_minutes,
551 users_in_recent_days,
552 })
553 })
554 .await
555 }
556
557 #[allow(clippy::too_many_arguments)]
558 async fn update_usage_for_measure(
559 &self,
560 user_id: UserId,
561 is_staff: bool,
562 model_id: ModelId,
563 usages: &[usage::Model],
564 usage_measure: UsageMeasure,
565 now: DateTimeUtc,
566 usage_to_add: usize,
567 tx: &DatabaseTransaction,
568 ) -> Result<usize> {
569 let now = now.naive_utc();
570 let measure_id = *self
571 .usage_measure_ids
572 .get(&usage_measure)
573 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
574
575 let mut id = None;
576 let mut timestamp = now;
577 let mut buckets = vec![0_i64];
578
579 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
580 id = Some(old_usage.id);
581 let (live_buckets, buckets_since) =
582 Self::get_live_buckets(old_usage, now, usage_measure);
583 if !live_buckets.is_empty() {
584 buckets.clear();
585 buckets.extend_from_slice(live_buckets);
586 buckets.extend(iter::repeat(0).take(buckets_since));
587 timestamp =
588 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
589 }
590 }
591
592 *buckets.last_mut().unwrap() += usage_to_add as i64;
593 let total_usage = buckets.iter().sum::<i64>() as usize;
594
595 let mut model = usage::ActiveModel {
596 user_id: ActiveValue::set(user_id),
597 is_staff: ActiveValue::set(is_staff),
598 model_id: ActiveValue::set(model_id),
599 measure_id: ActiveValue::set(measure_id),
600 timestamp: ActiveValue::set(timestamp),
601 buckets: ActiveValue::set(buckets),
602 ..Default::default()
603 };
604
605 if let Some(id) = id {
606 model.id = ActiveValue::unchanged(id);
607 model.update(tx).await?;
608 } else {
609 usage::Entity::insert(model)
610 .exec_without_returning(tx)
611 .await?;
612 }
613
614 Ok(total_usage)
615 }
616
617 fn get_usage_for_measure(
618 &self,
619 usages: &[usage::Model],
620 now: DateTimeUtc,
621 usage_measure: UsageMeasure,
622 ) -> Result<usize> {
623 let now = now.naive_utc();
624 let measure_id = *self
625 .usage_measure_ids
626 .get(&usage_measure)
627 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
628 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
629 return Ok(0);
630 };
631
632 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
633 Ok(live_buckets.iter().sum::<i64>() as _)
634 }
635
636 fn get_live_buckets(
637 usage: &usage::Model,
638 now: chrono::NaiveDateTime,
639 measure: UsageMeasure,
640 ) -> (&[i64], usize) {
641 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
642 let buckets_since_usage =
643 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
644 let buckets_since_usage = buckets_since_usage.ceil() as usize;
645 let mut live_buckets = &[] as &[i64];
646 if buckets_since_usage < measure.bucket_count() {
647 let expired_bucket_count =
648 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
649 live_buckets = &usage.buckets[expired_bucket_count..];
650 while live_buckets.first() == Some(&0) {
651 live_buckets = &live_buckets[1..];
652 }
653 }
654 (live_buckets, buckets_since_usage)
655 }
656}
657
658fn calculate_spending(
659 model: &model::Model,
660 input_tokens_this_month: usize,
661 cache_creation_input_tokens_this_month: usize,
662 cache_read_input_tokens_this_month: usize,
663 output_tokens_this_month: usize,
664) -> Cents {
665 let input_token_cost =
666 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
667 let cache_creation_input_token_cost = cache_creation_input_tokens_this_month
668 * model.price_per_million_cache_creation_input_tokens as usize
669 / 1_000_000;
670 let cache_read_input_token_cost = cache_read_input_tokens_this_month
671 * model.price_per_million_cache_read_input_tokens as usize
672 / 1_000_000;
673 let output_token_cost =
674 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
675 let spending = input_token_cost
676 + cache_creation_input_token_cost
677 + cache_read_input_token_cost
678 + output_token_cost;
679 Cents::new(spending as u32)
680}
681
682const MINUTE_BUCKET_COUNT: usize = 12;
683const DAY_BUCKET_COUNT: usize = 48;
684
685impl UsageMeasure {
686 fn bucket_count(&self) -> usize {
687 match self {
688 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
689 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
690 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
691 }
692 }
693
694 fn total_duration(&self) -> Duration {
695 match self {
696 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
697 UsageMeasure::TokensPerMinute => Duration::minutes(1),
698 UsageMeasure::TokensPerDay => Duration::hours(24),
699 }
700 }
701
702 fn bucket_duration(&self) -> Duration {
703 self.total_duration() / self.bucket_count() as i32
704 }
705}