1use crate::db::UserId;
2use crate::llm::Cents;
3use chrono::{Datelike, Duration};
4use futures::StreamExt as _;
5use rpc::LanguageModelProvider;
6use sea_orm::QuerySelect;
7use std::{iter, str::FromStr};
8use strum::IntoEnumIterator as _;
9
10use super::*;
11
12#[derive(Debug, PartialEq, Clone, Copy, Default, serde::Serialize)]
13pub struct TokenUsage {
14 pub input: usize,
15 pub input_cache_creation: usize,
16 pub input_cache_read: usize,
17 pub output: usize,
18}
19
20impl TokenUsage {
21 pub fn total(&self) -> usize {
22 self.input + self.input_cache_creation + self.input_cache_read + self.output
23 }
24}
25
26#[derive(Debug, PartialEq, Clone, Copy, serde::Serialize)]
27pub struct Usage {
28 pub requests_this_minute: usize,
29 pub tokens_this_minute: usize,
30 pub tokens_this_day: usize,
31 pub tokens_this_month: TokenUsage,
32 pub spending_this_month: Cents,
33 pub lifetime_spending: Cents,
34}
35
36#[derive(Debug, PartialEq, Clone)]
37pub struct ApplicationWideUsage {
38 pub provider: LanguageModelProvider,
39 pub model: String,
40 pub requests_this_minute: usize,
41 pub tokens_this_minute: usize,
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct ActiveUserCount {
46 pub users_in_recent_minutes: usize,
47 pub users_in_recent_days: usize,
48}
49
50impl LlmDatabase {
51 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
52 let all_measures = self
53 .transaction(|tx| async move {
54 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
55
56 let new_measures = UsageMeasure::iter()
57 .filter(|measure| {
58 !existing_measures
59 .iter()
60 .any(|m| m.name == measure.to_string())
61 })
62 .map(|measure| usage_measure::ActiveModel {
63 name: ActiveValue::set(measure.to_string()),
64 ..Default::default()
65 })
66 .collect::<Vec<_>>();
67
68 if !new_measures.is_empty() {
69 usage_measure::Entity::insert_many(new_measures)
70 .exec(&*tx)
71 .await?;
72 }
73
74 Ok(usage_measure::Entity::find().all(&*tx).await?)
75 })
76 .await?;
77
78 self.usage_measure_ids = all_measures
79 .into_iter()
80 .filter_map(|measure| {
81 UsageMeasure::from_str(&measure.name)
82 .ok()
83 .map(|um| (um, measure.id))
84 })
85 .collect();
86 Ok(())
87 }
88
89 pub async fn get_application_wide_usages_by_model(
90 &self,
91 now: DateTimeUtc,
92 ) -> Result<Vec<ApplicationWideUsage>> {
93 self.transaction(|tx| async move {
94 let past_minute = now - Duration::minutes(1);
95 let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
96 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
97
98 let mut results = Vec::new();
99 for ((provider, model_name), model) in self.models.iter() {
100 let mut usages = usage::Entity::find()
101 .filter(
102 usage::Column::Timestamp
103 .gte(past_minute.naive_utc())
104 .and(usage::Column::IsStaff.eq(false))
105 .and(usage::Column::ModelId.eq(model.id))
106 .and(
107 usage::Column::MeasureId
108 .eq(requests_per_minute)
109 .or(usage::Column::MeasureId.eq(tokens_per_minute)),
110 ),
111 )
112 .stream(&*tx)
113 .await?;
114
115 let mut requests_this_minute = 0;
116 let mut tokens_this_minute = 0;
117 while let Some(usage) = usages.next().await {
118 let usage = usage?;
119 if usage.measure_id == requests_per_minute {
120 requests_this_minute += Self::get_live_buckets(
121 &usage,
122 now.naive_utc(),
123 UsageMeasure::RequestsPerMinute,
124 )
125 .0
126 .iter()
127 .copied()
128 .sum::<i64>() as usize;
129 } else if usage.measure_id == tokens_per_minute {
130 tokens_this_minute += Self::get_live_buckets(
131 &usage,
132 now.naive_utc(),
133 UsageMeasure::TokensPerMinute,
134 )
135 .0
136 .iter()
137 .copied()
138 .sum::<i64>() as usize;
139 }
140 }
141
142 results.push(ApplicationWideUsage {
143 provider: *provider,
144 model: model_name.clone(),
145 requests_this_minute,
146 tokens_this_minute,
147 })
148 }
149
150 Ok(results)
151 })
152 .await
153 }
154
155 pub async fn get_user_spending_for_month(
156 &self,
157 user_id: UserId,
158 now: DateTimeUtc,
159 ) -> Result<Cents> {
160 self.transaction(|tx| async move {
161 let month = now.date_naive().month() as i32;
162 let year = now.date_naive().year();
163
164 let mut monthly_usages = monthly_usage::Entity::find()
165 .filter(
166 monthly_usage::Column::UserId
167 .eq(user_id)
168 .and(monthly_usage::Column::Month.eq(month))
169 .and(monthly_usage::Column::Year.eq(year)),
170 )
171 .stream(&*tx)
172 .await?;
173 let mut monthly_spending = Cents::ZERO;
174
175 while let Some(usage) = monthly_usages.next().await {
176 let usage = usage?;
177 let Ok(model) = self.model_by_id(usage.model_id) else {
178 continue;
179 };
180
181 monthly_spending += calculate_spending(
182 model,
183 usage.input_tokens as usize,
184 usage.cache_creation_input_tokens as usize,
185 usage.cache_read_input_tokens as usize,
186 usage.output_tokens as usize,
187 );
188 }
189
190 Ok(monthly_spending)
191 })
192 .await
193 }
194
195 pub async fn get_usage(
196 &self,
197 user_id: UserId,
198 provider: LanguageModelProvider,
199 model_name: &str,
200 now: DateTimeUtc,
201 ) -> Result<Usage> {
202 self.transaction(|tx| async move {
203 let model = self
204 .models
205 .get(&(provider, model_name.to_string()))
206 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
207
208 let usages = usage::Entity::find()
209 .filter(
210 usage::Column::UserId
211 .eq(user_id)
212 .and(usage::Column::ModelId.eq(model.id)),
213 )
214 .all(&*tx)
215 .await?;
216
217 let month = now.date_naive().month() as i32;
218 let year = now.date_naive().year();
219 let monthly_usage = monthly_usage::Entity::find()
220 .filter(
221 monthly_usage::Column::UserId
222 .eq(user_id)
223 .and(monthly_usage::Column::ModelId.eq(model.id))
224 .and(monthly_usage::Column::Month.eq(month))
225 .and(monthly_usage::Column::Year.eq(year)),
226 )
227 .one(&*tx)
228 .await?;
229 let lifetime_usage = lifetime_usage::Entity::find()
230 .filter(
231 lifetime_usage::Column::UserId
232 .eq(user_id)
233 .and(lifetime_usage::Column::ModelId.eq(model.id)),
234 )
235 .one(&*tx)
236 .await?;
237
238 let requests_this_minute =
239 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
240 let tokens_this_minute =
241 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
242 let tokens_this_day =
243 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
244 let spending_this_month = if let Some(monthly_usage) = &monthly_usage {
245 calculate_spending(
246 model,
247 monthly_usage.input_tokens as usize,
248 monthly_usage.cache_creation_input_tokens as usize,
249 monthly_usage.cache_read_input_tokens as usize,
250 monthly_usage.output_tokens as usize,
251 )
252 } else {
253 Cents::ZERO
254 };
255 let lifetime_spending = if let Some(lifetime_usage) = &lifetime_usage {
256 calculate_spending(
257 model,
258 lifetime_usage.input_tokens as usize,
259 lifetime_usage.cache_creation_input_tokens as usize,
260 lifetime_usage.cache_read_input_tokens as usize,
261 lifetime_usage.output_tokens as usize,
262 )
263 } else {
264 Cents::ZERO
265 };
266
267 Ok(Usage {
268 requests_this_minute,
269 tokens_this_minute,
270 tokens_this_day,
271 tokens_this_month: TokenUsage {
272 input: monthly_usage
273 .as_ref()
274 .map_or(0, |usage| usage.input_tokens as usize),
275 input_cache_creation: monthly_usage
276 .as_ref()
277 .map_or(0, |usage| usage.cache_creation_input_tokens as usize),
278 input_cache_read: monthly_usage
279 .as_ref()
280 .map_or(0, |usage| usage.cache_read_input_tokens as usize),
281 output: monthly_usage
282 .as_ref()
283 .map_or(0, |usage| usage.output_tokens as usize),
284 },
285 spending_this_month,
286 lifetime_spending,
287 })
288 })
289 .await
290 }
291
292 pub async fn record_usage(
293 &self,
294 user_id: UserId,
295 is_staff: bool,
296 provider: LanguageModelProvider,
297 model_name: &str,
298 tokens: TokenUsage,
299 has_llm_subscription: bool,
300 max_monthly_spend: Cents,
301 free_tier_monthly_spending_limit: Cents,
302 now: DateTimeUtc,
303 ) -> Result<Usage> {
304 self.transaction(|tx| async move {
305 let model = self.model(provider, model_name)?;
306
307 let usages = usage::Entity::find()
308 .filter(
309 usage::Column::UserId
310 .eq(user_id)
311 .and(usage::Column::ModelId.eq(model.id)),
312 )
313 .all(&*tx)
314 .await?;
315
316 let requests_this_minute = self
317 .update_usage_for_measure(
318 user_id,
319 is_staff,
320 model.id,
321 &usages,
322 UsageMeasure::RequestsPerMinute,
323 now,
324 1,
325 &tx,
326 )
327 .await?;
328 let tokens_this_minute = self
329 .update_usage_for_measure(
330 user_id,
331 is_staff,
332 model.id,
333 &usages,
334 UsageMeasure::TokensPerMinute,
335 now,
336 tokens.total(),
337 &tx,
338 )
339 .await?;
340 let tokens_this_day = self
341 .update_usage_for_measure(
342 user_id,
343 is_staff,
344 model.id,
345 &usages,
346 UsageMeasure::TokensPerDay,
347 now,
348 tokens.total(),
349 &tx,
350 )
351 .await?;
352
353 let month = now.date_naive().month() as i32;
354 let year = now.date_naive().year();
355
356 // Update monthly usage
357 let monthly_usage = monthly_usage::Entity::find()
358 .filter(
359 monthly_usage::Column::UserId
360 .eq(user_id)
361 .and(monthly_usage::Column::ModelId.eq(model.id))
362 .and(monthly_usage::Column::Month.eq(month))
363 .and(monthly_usage::Column::Year.eq(year)),
364 )
365 .one(&*tx)
366 .await?;
367
368 let monthly_usage = match monthly_usage {
369 Some(usage) => {
370 monthly_usage::Entity::update(monthly_usage::ActiveModel {
371 id: ActiveValue::unchanged(usage.id),
372 input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
373 cache_creation_input_tokens: ActiveValue::set(
374 usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
375 ),
376 cache_read_input_tokens: ActiveValue::set(
377 usage.cache_read_input_tokens + tokens.input_cache_read as i64,
378 ),
379 output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
380 ..Default::default()
381 })
382 .exec(&*tx)
383 .await?
384 }
385 None => {
386 monthly_usage::ActiveModel {
387 user_id: ActiveValue::set(user_id),
388 model_id: ActiveValue::set(model.id),
389 month: ActiveValue::set(month),
390 year: ActiveValue::set(year),
391 input_tokens: ActiveValue::set(tokens.input as i64),
392 cache_creation_input_tokens: ActiveValue::set(
393 tokens.input_cache_creation as i64,
394 ),
395 cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
396 output_tokens: ActiveValue::set(tokens.output as i64),
397 ..Default::default()
398 }
399 .insert(&*tx)
400 .await?
401 }
402 };
403
404 let spending_this_month = calculate_spending(
405 model,
406 monthly_usage.input_tokens as usize,
407 monthly_usage.cache_creation_input_tokens as usize,
408 monthly_usage.cache_read_input_tokens as usize,
409 monthly_usage.output_tokens as usize,
410 );
411
412 if !is_staff
413 && spending_this_month > free_tier_monthly_spending_limit
414 && has_llm_subscription
415 && (spending_this_month - free_tier_monthly_spending_limit) <= max_monthly_spend
416 {
417 billing_event::ActiveModel {
418 id: ActiveValue::not_set(),
419 idempotency_key: ActiveValue::not_set(),
420 user_id: ActiveValue::set(user_id),
421 model_id: ActiveValue::set(model.id),
422 input_tokens: ActiveValue::set(tokens.input as i64),
423 input_cache_creation_tokens: ActiveValue::set(
424 tokens.input_cache_creation as i64,
425 ),
426 input_cache_read_tokens: ActiveValue::set(tokens.input_cache_read as i64),
427 output_tokens: ActiveValue::set(tokens.output as i64),
428 }
429 .insert(&*tx)
430 .await?;
431 }
432
433 // Update lifetime usage
434 let lifetime_usage = lifetime_usage::Entity::find()
435 .filter(
436 lifetime_usage::Column::UserId
437 .eq(user_id)
438 .and(lifetime_usage::Column::ModelId.eq(model.id)),
439 )
440 .one(&*tx)
441 .await?;
442
443 let lifetime_usage = match lifetime_usage {
444 Some(usage) => {
445 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
446 id: ActiveValue::unchanged(usage.id),
447 input_tokens: ActiveValue::set(usage.input_tokens + tokens.input as i64),
448 cache_creation_input_tokens: ActiveValue::set(
449 usage.cache_creation_input_tokens + tokens.input_cache_creation as i64,
450 ),
451 cache_read_input_tokens: ActiveValue::set(
452 usage.cache_read_input_tokens + tokens.input_cache_read as i64,
453 ),
454 output_tokens: ActiveValue::set(usage.output_tokens + tokens.output as i64),
455 ..Default::default()
456 })
457 .exec(&*tx)
458 .await?
459 }
460 None => {
461 lifetime_usage::ActiveModel {
462 user_id: ActiveValue::set(user_id),
463 model_id: ActiveValue::set(model.id),
464 input_tokens: ActiveValue::set(tokens.input as i64),
465 cache_creation_input_tokens: ActiveValue::set(
466 tokens.input_cache_creation as i64,
467 ),
468 cache_read_input_tokens: ActiveValue::set(tokens.input_cache_read as i64),
469 output_tokens: ActiveValue::set(tokens.output as i64),
470 ..Default::default()
471 }
472 .insert(&*tx)
473 .await?
474 }
475 };
476
477 let lifetime_spending = calculate_spending(
478 model,
479 lifetime_usage.input_tokens as usize,
480 lifetime_usage.cache_creation_input_tokens as usize,
481 lifetime_usage.cache_read_input_tokens as usize,
482 lifetime_usage.output_tokens as usize,
483 );
484
485 Ok(Usage {
486 requests_this_minute,
487 tokens_this_minute,
488 tokens_this_day,
489 tokens_this_month: TokenUsage {
490 input: monthly_usage.input_tokens as usize,
491 input_cache_creation: monthly_usage.cache_creation_input_tokens as usize,
492 input_cache_read: monthly_usage.cache_read_input_tokens as usize,
493 output: monthly_usage.output_tokens as usize,
494 },
495 spending_this_month,
496 lifetime_spending,
497 })
498 })
499 .await
500 }
501
502 /// Returns the active user count for the specified model.
503 pub async fn get_active_user_count(
504 &self,
505 provider: LanguageModelProvider,
506 model_name: &str,
507 now: DateTimeUtc,
508 ) -> Result<ActiveUserCount> {
509 self.transaction(|tx| async move {
510 let minute_since = now - Duration::minutes(5);
511 let day_since = now - Duration::days(5);
512
513 let model = self
514 .models
515 .get(&(provider, model_name.to_string()))
516 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
517
518 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
519
520 let users_in_recent_minutes = usage::Entity::find()
521 .filter(
522 usage::Column::ModelId
523 .eq(model.id)
524 .and(usage::Column::MeasureId.eq(tokens_per_minute))
525 .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
526 .and(usage::Column::IsStaff.eq(false)),
527 )
528 .select_only()
529 .column(usage::Column::UserId)
530 .group_by(usage::Column::UserId)
531 .count(&*tx)
532 .await? as usize;
533
534 let users_in_recent_days = usage::Entity::find()
535 .filter(
536 usage::Column::ModelId
537 .eq(model.id)
538 .and(usage::Column::MeasureId.eq(tokens_per_minute))
539 .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
540 .and(usage::Column::IsStaff.eq(false)),
541 )
542 .select_only()
543 .column(usage::Column::UserId)
544 .group_by(usage::Column::UserId)
545 .count(&*tx)
546 .await? as usize;
547
548 Ok(ActiveUserCount {
549 users_in_recent_minutes,
550 users_in_recent_days,
551 })
552 })
553 .await
554 }
555
556 async fn update_usage_for_measure(
557 &self,
558 user_id: UserId,
559 is_staff: bool,
560 model_id: ModelId,
561 usages: &[usage::Model],
562 usage_measure: UsageMeasure,
563 now: DateTimeUtc,
564 usage_to_add: usize,
565 tx: &DatabaseTransaction,
566 ) -> Result<usize> {
567 let now = now.naive_utc();
568 let measure_id = *self
569 .usage_measure_ids
570 .get(&usage_measure)
571 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
572
573 let mut id = None;
574 let mut timestamp = now;
575 let mut buckets = vec![0_i64];
576
577 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
578 id = Some(old_usage.id);
579 let (live_buckets, buckets_since) =
580 Self::get_live_buckets(old_usage, now, usage_measure);
581 if !live_buckets.is_empty() {
582 buckets.clear();
583 buckets.extend_from_slice(live_buckets);
584 buckets.extend(iter::repeat(0).take(buckets_since));
585 timestamp =
586 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
587 }
588 }
589
590 *buckets.last_mut().unwrap() += usage_to_add as i64;
591 let total_usage = buckets.iter().sum::<i64>() as usize;
592
593 let mut model = usage::ActiveModel {
594 user_id: ActiveValue::set(user_id),
595 is_staff: ActiveValue::set(is_staff),
596 model_id: ActiveValue::set(model_id),
597 measure_id: ActiveValue::set(measure_id),
598 timestamp: ActiveValue::set(timestamp),
599 buckets: ActiveValue::set(buckets),
600 ..Default::default()
601 };
602
603 if let Some(id) = id {
604 model.id = ActiveValue::unchanged(id);
605 model.update(tx).await?;
606 } else {
607 usage::Entity::insert(model)
608 .exec_without_returning(tx)
609 .await?;
610 }
611
612 Ok(total_usage)
613 }
614
615 fn get_usage_for_measure(
616 &self,
617 usages: &[usage::Model],
618 now: DateTimeUtc,
619 usage_measure: UsageMeasure,
620 ) -> Result<usize> {
621 let now = now.naive_utc();
622 let measure_id = *self
623 .usage_measure_ids
624 .get(&usage_measure)
625 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
626 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
627 return Ok(0);
628 };
629
630 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
631 Ok(live_buckets.iter().sum::<i64>() as _)
632 }
633
634 fn get_live_buckets(
635 usage: &usage::Model,
636 now: chrono::NaiveDateTime,
637 measure: UsageMeasure,
638 ) -> (&[i64], usize) {
639 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
640 let buckets_since_usage =
641 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
642 let buckets_since_usage = buckets_since_usage.ceil() as usize;
643 let mut live_buckets = &[] as &[i64];
644 if buckets_since_usage < measure.bucket_count() {
645 let expired_bucket_count =
646 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
647 live_buckets = &usage.buckets[expired_bucket_count..];
648 while live_buckets.first() == Some(&0) {
649 live_buckets = &live_buckets[1..];
650 }
651 }
652 (live_buckets, buckets_since_usage)
653 }
654}
655
656fn calculate_spending(
657 model: &model::Model,
658 input_tokens_this_month: usize,
659 cache_creation_input_tokens_this_month: usize,
660 cache_read_input_tokens_this_month: usize,
661 output_tokens_this_month: usize,
662) -> Cents {
663 let input_token_cost =
664 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
665 let cache_creation_input_token_cost = cache_creation_input_tokens_this_month
666 * model.price_per_million_cache_creation_input_tokens as usize
667 / 1_000_000;
668 let cache_read_input_token_cost = cache_read_input_tokens_this_month
669 * model.price_per_million_cache_read_input_tokens as usize
670 / 1_000_000;
671 let output_token_cost =
672 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
673 let spending = input_token_cost
674 + cache_creation_input_token_cost
675 + cache_read_input_token_cost
676 + output_token_cost;
677 Cents::new(spending as u32)
678}
679
680const MINUTE_BUCKET_COUNT: usize = 12;
681const DAY_BUCKET_COUNT: usize = 48;
682
683impl UsageMeasure {
684 fn bucket_count(&self) -> usize {
685 match self {
686 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
687 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
688 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
689 }
690 }
691
692 fn total_duration(&self) -> Duration {
693 match self {
694 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
695 UsageMeasure::TokensPerMinute => Duration::minutes(1),
696 UsageMeasure::TokensPerDay => Duration::hours(24),
697 }
698 }
699
700 fn bucket_duration(&self) -> Duration {
701 self.total_duration() / self.bucket_count() as i32
702 }
703}