1use crate::db::UserId;
2use chrono::{Datelike, Duration};
3use futures::StreamExt as _;
4use rpc::LanguageModelProvider;
5use sea_orm::QuerySelect;
6use std::{iter, str::FromStr};
7use strum::IntoEnumIterator as _;
8
9use super::*;
10
11#[derive(Debug, PartialEq, Clone, Copy)]
12pub struct Usage {
13 pub requests_this_minute: usize,
14 pub tokens_this_minute: usize,
15 pub tokens_this_day: usize,
16 pub input_tokens_this_month: usize,
17 pub cache_creation_input_tokens_this_month: usize,
18 pub cache_read_input_tokens_this_month: usize,
19 pub output_tokens_this_month: usize,
20 pub spending_this_month: usize,
21 pub lifetime_spending: usize,
22}
23
24#[derive(Debug, PartialEq, Clone)]
25pub struct ApplicationWideUsage {
26 pub provider: LanguageModelProvider,
27 pub model: String,
28 pub requests_this_minute: usize,
29 pub tokens_this_minute: usize,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct ActiveUserCount {
34 pub users_in_recent_minutes: usize,
35 pub users_in_recent_days: usize,
36}
37
38impl LlmDatabase {
39 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
40 let all_measures = self
41 .transaction(|tx| async move {
42 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
43
44 let new_measures = UsageMeasure::iter()
45 .filter(|measure| {
46 !existing_measures
47 .iter()
48 .any(|m| m.name == measure.to_string())
49 })
50 .map(|measure| usage_measure::ActiveModel {
51 name: ActiveValue::set(measure.to_string()),
52 ..Default::default()
53 })
54 .collect::<Vec<_>>();
55
56 if !new_measures.is_empty() {
57 usage_measure::Entity::insert_many(new_measures)
58 .exec(&*tx)
59 .await?;
60 }
61
62 Ok(usage_measure::Entity::find().all(&*tx).await?)
63 })
64 .await?;
65
66 self.usage_measure_ids = all_measures
67 .into_iter()
68 .filter_map(|measure| {
69 UsageMeasure::from_str(&measure.name)
70 .ok()
71 .map(|um| (um, measure.id))
72 })
73 .collect();
74 Ok(())
75 }
76
77 pub async fn get_application_wide_usages_by_model(
78 &self,
79 now: DateTimeUtc,
80 ) -> Result<Vec<ApplicationWideUsage>> {
81 self.transaction(|tx| async move {
82 let past_minute = now - Duration::minutes(1);
83 let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
84 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
85
86 let mut results = Vec::new();
87 for ((provider, model_name), model) in self.models.iter() {
88 let mut usages = usage::Entity::find()
89 .filter(
90 usage::Column::Timestamp
91 .gte(past_minute.naive_utc())
92 .and(usage::Column::IsStaff.eq(false))
93 .and(usage::Column::ModelId.eq(model.id))
94 .and(
95 usage::Column::MeasureId
96 .eq(requests_per_minute)
97 .or(usage::Column::MeasureId.eq(tokens_per_minute)),
98 ),
99 )
100 .stream(&*tx)
101 .await?;
102
103 let mut requests_this_minute = 0;
104 let mut tokens_this_minute = 0;
105 while let Some(usage) = usages.next().await {
106 let usage = usage?;
107 if usage.measure_id == requests_per_minute {
108 requests_this_minute += Self::get_live_buckets(
109 &usage,
110 now.naive_utc(),
111 UsageMeasure::RequestsPerMinute,
112 )
113 .0
114 .iter()
115 .copied()
116 .sum::<i64>() as usize;
117 } else if usage.measure_id == tokens_per_minute {
118 tokens_this_minute += Self::get_live_buckets(
119 &usage,
120 now.naive_utc(),
121 UsageMeasure::TokensPerMinute,
122 )
123 .0
124 .iter()
125 .copied()
126 .sum::<i64>() as usize;
127 }
128 }
129
130 results.push(ApplicationWideUsage {
131 provider: *provider,
132 model: model_name.clone(),
133 requests_this_minute,
134 tokens_this_minute,
135 })
136 }
137
138 Ok(results)
139 })
140 .await
141 }
142
143 pub async fn get_user_spending_for_month(
144 &self,
145 user_id: UserId,
146 now: DateTimeUtc,
147 ) -> Result<usize> {
148 self.transaction(|tx| async move {
149 let month = now.date_naive().month() as i32;
150 let year = now.date_naive().year();
151
152 let mut monthly_usages = monthly_usage::Entity::find()
153 .filter(
154 monthly_usage::Column::UserId
155 .eq(user_id)
156 .and(monthly_usage::Column::Month.eq(month))
157 .and(monthly_usage::Column::Year.eq(year)),
158 )
159 .stream(&*tx)
160 .await?;
161 let mut monthly_spending_in_cents = 0;
162
163 while let Some(usage) = monthly_usages.next().await {
164 let usage = usage?;
165 let Ok(model) = self.model_by_id(usage.model_id) else {
166 continue;
167 };
168
169 monthly_spending_in_cents += calculate_spending(
170 model,
171 usage.input_tokens as usize,
172 usage.cache_creation_input_tokens as usize,
173 usage.cache_read_input_tokens as usize,
174 usage.output_tokens as usize,
175 );
176 }
177
178 Ok(monthly_spending_in_cents)
179 })
180 .await
181 }
182
183 pub async fn get_usage(
184 &self,
185 user_id: UserId,
186 provider: LanguageModelProvider,
187 model_name: &str,
188 now: DateTimeUtc,
189 ) -> Result<Usage> {
190 self.transaction(|tx| async move {
191 let model = self
192 .models
193 .get(&(provider, model_name.to_string()))
194 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
195
196 let usages = usage::Entity::find()
197 .filter(
198 usage::Column::UserId
199 .eq(user_id)
200 .and(usage::Column::ModelId.eq(model.id)),
201 )
202 .all(&*tx)
203 .await?;
204
205 let month = now.date_naive().month() as i32;
206 let year = now.date_naive().year();
207 let monthly_usage = monthly_usage::Entity::find()
208 .filter(
209 monthly_usage::Column::UserId
210 .eq(user_id)
211 .and(monthly_usage::Column::ModelId.eq(model.id))
212 .and(monthly_usage::Column::Month.eq(month))
213 .and(monthly_usage::Column::Year.eq(year)),
214 )
215 .one(&*tx)
216 .await?;
217 let lifetime_usage = lifetime_usage::Entity::find()
218 .filter(
219 lifetime_usage::Column::UserId
220 .eq(user_id)
221 .and(lifetime_usage::Column::ModelId.eq(model.id)),
222 )
223 .one(&*tx)
224 .await?;
225
226 let requests_this_minute =
227 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
228 let tokens_this_minute =
229 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
230 let tokens_this_day =
231 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
232 let spending_this_month = if let Some(monthly_usage) = &monthly_usage {
233 calculate_spending(
234 model,
235 monthly_usage.input_tokens as usize,
236 monthly_usage.cache_creation_input_tokens as usize,
237 monthly_usage.cache_read_input_tokens as usize,
238 monthly_usage.output_tokens as usize,
239 )
240 } else {
241 0
242 };
243 let lifetime_spending = if let Some(lifetime_usage) = &lifetime_usage {
244 calculate_spending(
245 model,
246 lifetime_usage.input_tokens as usize,
247 lifetime_usage.cache_creation_input_tokens as usize,
248 lifetime_usage.cache_read_input_tokens as usize,
249 lifetime_usage.output_tokens as usize,
250 )
251 } else {
252 0
253 };
254
255 Ok(Usage {
256 requests_this_minute,
257 tokens_this_minute,
258 tokens_this_day,
259 input_tokens_this_month: monthly_usage
260 .as_ref()
261 .map_or(0, |usage| usage.input_tokens as usize),
262 cache_creation_input_tokens_this_month: monthly_usage
263 .as_ref()
264 .map_or(0, |usage| usage.cache_creation_input_tokens as usize),
265 cache_read_input_tokens_this_month: monthly_usage
266 .as_ref()
267 .map_or(0, |usage| usage.cache_read_input_tokens as usize),
268 output_tokens_this_month: monthly_usage
269 .as_ref()
270 .map_or(0, |usage| usage.output_tokens as usize),
271 spending_this_month,
272 lifetime_spending,
273 })
274 })
275 .await
276 }
277
278 #[allow(clippy::too_many_arguments)]
279 pub async fn record_usage(
280 &self,
281 user_id: UserId,
282 is_staff: bool,
283 provider: LanguageModelProvider,
284 model_name: &str,
285 input_token_count: usize,
286 cache_creation_input_tokens: usize,
287 cache_read_input_tokens: usize,
288 output_token_count: usize,
289 now: DateTimeUtc,
290 ) -> Result<Usage> {
291 self.transaction(|tx| async move {
292 let model = self.model(provider, model_name)?;
293
294 let usages = usage::Entity::find()
295 .filter(
296 usage::Column::UserId
297 .eq(user_id)
298 .and(usage::Column::ModelId.eq(model.id)),
299 )
300 .all(&*tx)
301 .await?;
302
303 let requests_this_minute = self
304 .update_usage_for_measure(
305 user_id,
306 is_staff,
307 model.id,
308 &usages,
309 UsageMeasure::RequestsPerMinute,
310 now,
311 1,
312 &tx,
313 )
314 .await?;
315 let total_token_count = input_token_count
316 + cache_read_input_tokens
317 + cache_creation_input_tokens
318 + output_token_count;
319 let tokens_this_minute = self
320 .update_usage_for_measure(
321 user_id,
322 is_staff,
323 model.id,
324 &usages,
325 UsageMeasure::TokensPerMinute,
326 now,
327 total_token_count,
328 &tx,
329 )
330 .await?;
331 let tokens_this_day = self
332 .update_usage_for_measure(
333 user_id,
334 is_staff,
335 model.id,
336 &usages,
337 UsageMeasure::TokensPerDay,
338 now,
339 total_token_count,
340 &tx,
341 )
342 .await?;
343
344 let month = now.date_naive().month() as i32;
345 let year = now.date_naive().year();
346
347 // Update monthly usage
348 let monthly_usage = monthly_usage::Entity::find()
349 .filter(
350 monthly_usage::Column::UserId
351 .eq(user_id)
352 .and(monthly_usage::Column::ModelId.eq(model.id))
353 .and(monthly_usage::Column::Month.eq(month))
354 .and(monthly_usage::Column::Year.eq(year)),
355 )
356 .one(&*tx)
357 .await?;
358
359 let monthly_usage = match monthly_usage {
360 Some(usage) => {
361 monthly_usage::Entity::update(monthly_usage::ActiveModel {
362 id: ActiveValue::unchanged(usage.id),
363 input_tokens: ActiveValue::set(
364 usage.input_tokens + input_token_count as i64,
365 ),
366 cache_creation_input_tokens: ActiveValue::set(
367 usage.cache_creation_input_tokens + cache_creation_input_tokens as i64,
368 ),
369 cache_read_input_tokens: ActiveValue::set(
370 usage.cache_read_input_tokens + cache_read_input_tokens as i64,
371 ),
372 output_tokens: ActiveValue::set(
373 usage.output_tokens + output_token_count as i64,
374 ),
375 ..Default::default()
376 })
377 .exec(&*tx)
378 .await?
379 }
380 None => {
381 monthly_usage::ActiveModel {
382 user_id: ActiveValue::set(user_id),
383 model_id: ActiveValue::set(model.id),
384 month: ActiveValue::set(month),
385 year: ActiveValue::set(year),
386 input_tokens: ActiveValue::set(input_token_count as i64),
387 cache_creation_input_tokens: ActiveValue::set(
388 cache_creation_input_tokens as i64,
389 ),
390 cache_read_input_tokens: ActiveValue::set(cache_read_input_tokens as i64),
391 output_tokens: ActiveValue::set(output_token_count as i64),
392 ..Default::default()
393 }
394 .insert(&*tx)
395 .await?
396 }
397 };
398
399 let spending_this_month = calculate_spending(
400 model,
401 monthly_usage.input_tokens as usize,
402 monthly_usage.cache_creation_input_tokens as usize,
403 monthly_usage.cache_read_input_tokens as usize,
404 monthly_usage.output_tokens as usize,
405 );
406
407 // Update lifetime usage
408 let lifetime_usage = lifetime_usage::Entity::find()
409 .filter(
410 lifetime_usage::Column::UserId
411 .eq(user_id)
412 .and(lifetime_usage::Column::ModelId.eq(model.id)),
413 )
414 .one(&*tx)
415 .await?;
416
417 let lifetime_usage = match lifetime_usage {
418 Some(usage) => {
419 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
420 id: ActiveValue::unchanged(usage.id),
421 input_tokens: ActiveValue::set(
422 usage.input_tokens + input_token_count as i64,
423 ),
424 cache_creation_input_tokens: ActiveValue::set(
425 usage.cache_creation_input_tokens + cache_creation_input_tokens as i64,
426 ),
427 cache_read_input_tokens: ActiveValue::set(
428 usage.cache_read_input_tokens + cache_read_input_tokens as i64,
429 ),
430 output_tokens: ActiveValue::set(
431 usage.output_tokens + output_token_count as i64,
432 ),
433 ..Default::default()
434 })
435 .exec(&*tx)
436 .await?
437 }
438 None => {
439 lifetime_usage::ActiveModel {
440 user_id: ActiveValue::set(user_id),
441 model_id: ActiveValue::set(model.id),
442 input_tokens: ActiveValue::set(input_token_count as i64),
443 cache_creation_input_tokens: ActiveValue::set(
444 cache_creation_input_tokens as i64,
445 ),
446 cache_read_input_tokens: ActiveValue::set(cache_read_input_tokens as i64),
447 output_tokens: ActiveValue::set(output_token_count as i64),
448 ..Default::default()
449 }
450 .insert(&*tx)
451 .await?
452 }
453 };
454
455 let lifetime_spending = calculate_spending(
456 model,
457 lifetime_usage.input_tokens as usize,
458 lifetime_usage.cache_creation_input_tokens as usize,
459 lifetime_usage.cache_read_input_tokens as usize,
460 lifetime_usage.output_tokens as usize,
461 );
462
463 Ok(Usage {
464 requests_this_minute,
465 tokens_this_minute,
466 tokens_this_day,
467 input_tokens_this_month: monthly_usage.input_tokens as usize,
468 cache_creation_input_tokens_this_month: monthly_usage.cache_creation_input_tokens
469 as usize,
470 cache_read_input_tokens_this_month: monthly_usage.cache_read_input_tokens as usize,
471 output_tokens_this_month: monthly_usage.output_tokens as usize,
472 spending_this_month,
473 lifetime_spending,
474 })
475 })
476 .await
477 }
478
479 /// Returns the active user count for the specified model.
480 pub async fn get_active_user_count(
481 &self,
482 provider: LanguageModelProvider,
483 model_name: &str,
484 now: DateTimeUtc,
485 ) -> Result<ActiveUserCount> {
486 self.transaction(|tx| async move {
487 let minute_since = now - Duration::minutes(5);
488 let day_since = now - Duration::days(5);
489
490 let model = self
491 .models
492 .get(&(provider, model_name.to_string()))
493 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
494
495 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
496
497 let users_in_recent_minutes = usage::Entity::find()
498 .filter(
499 usage::Column::ModelId
500 .eq(model.id)
501 .and(usage::Column::MeasureId.eq(tokens_per_minute))
502 .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
503 .and(usage::Column::IsStaff.eq(false)),
504 )
505 .select_only()
506 .column(usage::Column::UserId)
507 .group_by(usage::Column::UserId)
508 .count(&*tx)
509 .await? as usize;
510
511 let users_in_recent_days = usage::Entity::find()
512 .filter(
513 usage::Column::ModelId
514 .eq(model.id)
515 .and(usage::Column::MeasureId.eq(tokens_per_minute))
516 .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
517 .and(usage::Column::IsStaff.eq(false)),
518 )
519 .select_only()
520 .column(usage::Column::UserId)
521 .group_by(usage::Column::UserId)
522 .count(&*tx)
523 .await? as usize;
524
525 Ok(ActiveUserCount {
526 users_in_recent_minutes,
527 users_in_recent_days,
528 })
529 })
530 .await
531 }
532
533 #[allow(clippy::too_many_arguments)]
534 async fn update_usage_for_measure(
535 &self,
536 user_id: UserId,
537 is_staff: bool,
538 model_id: ModelId,
539 usages: &[usage::Model],
540 usage_measure: UsageMeasure,
541 now: DateTimeUtc,
542 usage_to_add: usize,
543 tx: &DatabaseTransaction,
544 ) -> Result<usize> {
545 let now = now.naive_utc();
546 let measure_id = *self
547 .usage_measure_ids
548 .get(&usage_measure)
549 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
550
551 let mut id = None;
552 let mut timestamp = now;
553 let mut buckets = vec![0_i64];
554
555 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
556 id = Some(old_usage.id);
557 let (live_buckets, buckets_since) =
558 Self::get_live_buckets(old_usage, now, usage_measure);
559 if !live_buckets.is_empty() {
560 buckets.clear();
561 buckets.extend_from_slice(live_buckets);
562 buckets.extend(iter::repeat(0).take(buckets_since));
563 timestamp =
564 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
565 }
566 }
567
568 *buckets.last_mut().unwrap() += usage_to_add as i64;
569 let total_usage = buckets.iter().sum::<i64>() as usize;
570
571 let mut model = usage::ActiveModel {
572 user_id: ActiveValue::set(user_id),
573 is_staff: ActiveValue::set(is_staff),
574 model_id: ActiveValue::set(model_id),
575 measure_id: ActiveValue::set(measure_id),
576 timestamp: ActiveValue::set(timestamp),
577 buckets: ActiveValue::set(buckets),
578 ..Default::default()
579 };
580
581 if let Some(id) = id {
582 model.id = ActiveValue::unchanged(id);
583 model.update(tx).await?;
584 } else {
585 usage::Entity::insert(model)
586 .exec_without_returning(tx)
587 .await?;
588 }
589
590 Ok(total_usage)
591 }
592
593 fn get_usage_for_measure(
594 &self,
595 usages: &[usage::Model],
596 now: DateTimeUtc,
597 usage_measure: UsageMeasure,
598 ) -> Result<usize> {
599 let now = now.naive_utc();
600 let measure_id = *self
601 .usage_measure_ids
602 .get(&usage_measure)
603 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
604 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
605 return Ok(0);
606 };
607
608 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
609 Ok(live_buckets.iter().sum::<i64>() as _)
610 }
611
612 fn get_live_buckets(
613 usage: &usage::Model,
614 now: chrono::NaiveDateTime,
615 measure: UsageMeasure,
616 ) -> (&[i64], usize) {
617 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
618 let buckets_since_usage =
619 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
620 let buckets_since_usage = buckets_since_usage.ceil() as usize;
621 let mut live_buckets = &[] as &[i64];
622 if buckets_since_usage < measure.bucket_count() {
623 let expired_bucket_count =
624 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
625 live_buckets = &usage.buckets[expired_bucket_count..];
626 while live_buckets.first() == Some(&0) {
627 live_buckets = &live_buckets[1..];
628 }
629 }
630 (live_buckets, buckets_since_usage)
631 }
632}
633
634fn calculate_spending(
635 model: &model::Model,
636 input_tokens_this_month: usize,
637 cache_creation_input_tokens_this_month: usize,
638 cache_read_input_tokens_this_month: usize,
639 output_tokens_this_month: usize,
640) -> usize {
641 let input_token_cost =
642 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
643 let cache_creation_input_token_cost = cache_creation_input_tokens_this_month
644 * model.price_per_million_cache_creation_input_tokens as usize
645 / 1_000_000;
646 let cache_read_input_token_cost = cache_read_input_tokens_this_month
647 * model.price_per_million_cache_read_input_tokens as usize
648 / 1_000_000;
649 let output_token_cost =
650 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
651 input_token_cost
652 + cache_creation_input_token_cost
653 + cache_read_input_token_cost
654 + output_token_cost
655}
656
657const MINUTE_BUCKET_COUNT: usize = 12;
658const DAY_BUCKET_COUNT: usize = 48;
659
660impl UsageMeasure {
661 fn bucket_count(&self) -> usize {
662 match self {
663 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
664 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
665 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
666 }
667 }
668
669 fn total_duration(&self) -> Duration {
670 match self {
671 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
672 UsageMeasure::TokensPerMinute => Duration::minutes(1),
673 UsageMeasure::TokensPerDay => Duration::hours(24),
674 }
675 }
676
677 fn bucket_duration(&self) -> Duration {
678 self.total_duration() / self.bucket_count() as i32
679 }
680}