1use crate::db::UserId;
2use chrono::Duration;
3use futures::StreamExt as _;
4use rpc::LanguageModelProvider;
5use sea_orm::QuerySelect;
6use std::{iter, str::FromStr};
7use strum::IntoEnumIterator as _;
8
9use super::*;
10
11#[derive(Debug, PartialEq, Clone, Copy)]
12pub struct Usage {
13 pub requests_this_minute: usize,
14 pub tokens_this_minute: usize,
15 pub tokens_this_day: usize,
16 pub input_tokens_this_month: usize,
17 pub output_tokens_this_month: usize,
18 pub spending_this_month: usize,
19 pub lifetime_spending: usize,
20}
21
22#[derive(Debug, PartialEq, Clone)]
23pub struct ApplicationWideUsage {
24 pub provider: LanguageModelProvider,
25 pub model: String,
26 pub requests_this_minute: usize,
27 pub tokens_this_minute: usize,
28}
29
30#[derive(Clone, Copy, Debug, Default)]
31pub struct ActiveUserCount {
32 pub users_in_recent_minutes: usize,
33 pub users_in_recent_days: usize,
34}
35
36impl LlmDatabase {
37 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
38 let all_measures = self
39 .transaction(|tx| async move {
40 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
41
42 let new_measures = UsageMeasure::iter()
43 .filter(|measure| {
44 !existing_measures
45 .iter()
46 .any(|m| m.name == measure.to_string())
47 })
48 .map(|measure| usage_measure::ActiveModel {
49 name: ActiveValue::set(measure.to_string()),
50 ..Default::default()
51 })
52 .collect::<Vec<_>>();
53
54 if !new_measures.is_empty() {
55 usage_measure::Entity::insert_many(new_measures)
56 .exec(&*tx)
57 .await?;
58 }
59
60 Ok(usage_measure::Entity::find().all(&*tx).await?)
61 })
62 .await?;
63
64 self.usage_measure_ids = all_measures
65 .into_iter()
66 .filter_map(|measure| {
67 UsageMeasure::from_str(&measure.name)
68 .ok()
69 .map(|um| (um, measure.id))
70 })
71 .collect();
72 Ok(())
73 }
74
75 pub async fn get_application_wide_usages_by_model(
76 &self,
77 now: DateTimeUtc,
78 ) -> Result<Vec<ApplicationWideUsage>> {
79 self.transaction(|tx| async move {
80 let past_minute = now - Duration::minutes(1);
81 let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
82 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
83
84 let mut results = Vec::new();
85 for ((provider, model_name), model) in self.models.iter() {
86 let mut usages = usage::Entity::find()
87 .filter(
88 usage::Column::Timestamp
89 .gte(past_minute.naive_utc())
90 .and(usage::Column::IsStaff.eq(false))
91 .and(usage::Column::ModelId.eq(model.id))
92 .and(
93 usage::Column::MeasureId
94 .eq(requests_per_minute)
95 .or(usage::Column::MeasureId.eq(tokens_per_minute)),
96 ),
97 )
98 .stream(&*tx)
99 .await?;
100
101 let mut requests_this_minute = 0;
102 let mut tokens_this_minute = 0;
103 while let Some(usage) = usages.next().await {
104 let usage = usage?;
105 if usage.measure_id == requests_per_minute {
106 requests_this_minute += Self::get_live_buckets(
107 &usage,
108 now.naive_utc(),
109 UsageMeasure::RequestsPerMinute,
110 )
111 .0
112 .iter()
113 .copied()
114 .sum::<i64>() as usize;
115 } else if usage.measure_id == tokens_per_minute {
116 tokens_this_minute += Self::get_live_buckets(
117 &usage,
118 now.naive_utc(),
119 UsageMeasure::TokensPerMinute,
120 )
121 .0
122 .iter()
123 .copied()
124 .sum::<i64>() as usize;
125 }
126 }
127
128 results.push(ApplicationWideUsage {
129 provider: *provider,
130 model: model_name.clone(),
131 requests_this_minute,
132 tokens_this_minute,
133 })
134 }
135
136 Ok(results)
137 })
138 .await
139 }
140
141 pub async fn get_usage(
142 &self,
143 user_id: UserId,
144 provider: LanguageModelProvider,
145 model_name: &str,
146 now: DateTimeUtc,
147 ) -> Result<Usage> {
148 self.transaction(|tx| async move {
149 let model = self
150 .models
151 .get(&(provider, model_name.to_string()))
152 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
153
154 let usages = usage::Entity::find()
155 .filter(
156 usage::Column::UserId
157 .eq(user_id)
158 .and(usage::Column::ModelId.eq(model.id)),
159 )
160 .all(&*tx)
161 .await?;
162
163 let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
164 .filter(
165 lifetime_usage::Column::UserId
166 .eq(user_id)
167 .and(lifetime_usage::Column::ModelId.eq(model.id)),
168 )
169 .one(&*tx)
170 .await?
171 .map_or((0, 0), |usage| {
172 (usage.input_tokens as usize, usage.output_tokens as usize)
173 });
174
175 let requests_this_minute =
176 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
177 let tokens_this_minute =
178 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
179 let tokens_this_day =
180 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
181 let input_tokens_this_month =
182 self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
183 let output_tokens_this_month =
184 self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
185 let spending_this_month =
186 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
187 let lifetime_spending =
188 calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
189
190 Ok(Usage {
191 requests_this_minute,
192 tokens_this_minute,
193 tokens_this_day,
194 input_tokens_this_month,
195 output_tokens_this_month,
196 spending_this_month,
197 lifetime_spending,
198 })
199 })
200 .await
201 }
202
203 #[allow(clippy::too_many_arguments)]
204 pub async fn record_usage(
205 &self,
206 user_id: UserId,
207 is_staff: bool,
208 provider: LanguageModelProvider,
209 model_name: &str,
210 input_token_count: usize,
211 output_token_count: usize,
212 now: DateTimeUtc,
213 ) -> Result<Usage> {
214 self.transaction(|tx| async move {
215 let model = self.model(provider, model_name)?;
216
217 let usages = usage::Entity::find()
218 .filter(
219 usage::Column::UserId
220 .eq(user_id)
221 .and(usage::Column::ModelId.eq(model.id)),
222 )
223 .all(&*tx)
224 .await?;
225
226 let requests_this_minute = self
227 .update_usage_for_measure(
228 user_id,
229 is_staff,
230 model.id,
231 &usages,
232 UsageMeasure::RequestsPerMinute,
233 now,
234 1,
235 &tx,
236 )
237 .await?;
238 let tokens_this_minute = self
239 .update_usage_for_measure(
240 user_id,
241 is_staff,
242 model.id,
243 &usages,
244 UsageMeasure::TokensPerMinute,
245 now,
246 input_token_count + output_token_count,
247 &tx,
248 )
249 .await?;
250 let tokens_this_day = self
251 .update_usage_for_measure(
252 user_id,
253 is_staff,
254 model.id,
255 &usages,
256 UsageMeasure::TokensPerDay,
257 now,
258 input_token_count + output_token_count,
259 &tx,
260 )
261 .await?;
262 let input_tokens_this_month = self
263 .update_usage_for_measure(
264 user_id,
265 is_staff,
266 model.id,
267 &usages,
268 UsageMeasure::InputTokensPerMonth,
269 now,
270 input_token_count,
271 &tx,
272 )
273 .await?;
274 let output_tokens_this_month = self
275 .update_usage_for_measure(
276 user_id,
277 is_staff,
278 model.id,
279 &usages,
280 UsageMeasure::OutputTokensPerMonth,
281 now,
282 output_token_count,
283 &tx,
284 )
285 .await?;
286 let spending_this_month =
287 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
288
289 // Update lifetime usage
290 let lifetime_usage = lifetime_usage::Entity::find()
291 .filter(
292 lifetime_usage::Column::UserId
293 .eq(user_id)
294 .and(lifetime_usage::Column::ModelId.eq(model.id)),
295 )
296 .one(&*tx)
297 .await?;
298
299 let lifetime_usage = match lifetime_usage {
300 Some(usage) => {
301 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
302 id: ActiveValue::unchanged(usage.id),
303 input_tokens: ActiveValue::set(
304 usage.input_tokens + input_token_count as i64,
305 ),
306 output_tokens: ActiveValue::set(
307 usage.output_tokens + output_token_count as i64,
308 ),
309 ..Default::default()
310 })
311 .exec(&*tx)
312 .await?
313 }
314 None => {
315 lifetime_usage::ActiveModel {
316 user_id: ActiveValue::set(user_id),
317 model_id: ActiveValue::set(model.id),
318 input_tokens: ActiveValue::set(input_token_count as i64),
319 output_tokens: ActiveValue::set(output_token_count as i64),
320 ..Default::default()
321 }
322 .insert(&*tx)
323 .await?
324 }
325 };
326
327 let lifetime_spending = calculate_spending(
328 model,
329 lifetime_usage.input_tokens as usize,
330 lifetime_usage.output_tokens as usize,
331 );
332
333 Ok(Usage {
334 requests_this_minute,
335 tokens_this_minute,
336 tokens_this_day,
337 input_tokens_this_month,
338 output_tokens_this_month,
339 spending_this_month,
340 lifetime_spending,
341 })
342 })
343 .await
344 }
345
346 /// Returns the active user count for the specified model.
347 pub async fn get_active_user_count(
348 &self,
349 provider: LanguageModelProvider,
350 model_name: &str,
351 now: DateTimeUtc,
352 ) -> Result<ActiveUserCount> {
353 self.transaction(|tx| async move {
354 let minute_since = now - Duration::minutes(5);
355 let day_since = now - Duration::days(5);
356
357 let model = self
358 .models
359 .get(&(provider, model_name.to_string()))
360 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
361
362 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
363
364 let users_in_recent_minutes = usage::Entity::find()
365 .filter(
366 usage::Column::ModelId
367 .eq(model.id)
368 .and(usage::Column::MeasureId.eq(tokens_per_minute))
369 .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
370 .and(usage::Column::IsStaff.eq(false)),
371 )
372 .select_only()
373 .column(usage::Column::UserId)
374 .group_by(usage::Column::UserId)
375 .count(&*tx)
376 .await? as usize;
377
378 let users_in_recent_days = usage::Entity::find()
379 .filter(
380 usage::Column::ModelId
381 .eq(model.id)
382 .and(usage::Column::MeasureId.eq(tokens_per_minute))
383 .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
384 .and(usage::Column::IsStaff.eq(false)),
385 )
386 .select_only()
387 .column(usage::Column::UserId)
388 .group_by(usage::Column::UserId)
389 .count(&*tx)
390 .await? as usize;
391
392 Ok(ActiveUserCount {
393 users_in_recent_minutes,
394 users_in_recent_days,
395 })
396 })
397 .await
398 }
399
400 #[allow(clippy::too_many_arguments)]
401 async fn update_usage_for_measure(
402 &self,
403 user_id: UserId,
404 is_staff: bool,
405 model_id: ModelId,
406 usages: &[usage::Model],
407 usage_measure: UsageMeasure,
408 now: DateTimeUtc,
409 usage_to_add: usize,
410 tx: &DatabaseTransaction,
411 ) -> Result<usize> {
412 let now = now.naive_utc();
413 let measure_id = *self
414 .usage_measure_ids
415 .get(&usage_measure)
416 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
417
418 let mut id = None;
419 let mut timestamp = now;
420 let mut buckets = vec![0_i64];
421
422 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
423 id = Some(old_usage.id);
424 let (live_buckets, buckets_since) =
425 Self::get_live_buckets(old_usage, now, usage_measure);
426 if !live_buckets.is_empty() {
427 buckets.clear();
428 buckets.extend_from_slice(live_buckets);
429 buckets.extend(iter::repeat(0).take(buckets_since));
430 timestamp =
431 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
432 }
433 }
434
435 *buckets.last_mut().unwrap() += usage_to_add as i64;
436 let total_usage = buckets.iter().sum::<i64>() as usize;
437
438 let mut model = usage::ActiveModel {
439 user_id: ActiveValue::set(user_id),
440 is_staff: ActiveValue::set(is_staff),
441 model_id: ActiveValue::set(model_id),
442 measure_id: ActiveValue::set(measure_id),
443 timestamp: ActiveValue::set(timestamp),
444 buckets: ActiveValue::set(buckets),
445 ..Default::default()
446 };
447
448 if let Some(id) = id {
449 model.id = ActiveValue::unchanged(id);
450 model.update(tx).await?;
451 } else {
452 usage::Entity::insert(model)
453 .exec_without_returning(tx)
454 .await?;
455 }
456
457 Ok(total_usage)
458 }
459
460 fn get_usage_for_measure(
461 &self,
462 usages: &[usage::Model],
463 now: DateTimeUtc,
464 usage_measure: UsageMeasure,
465 ) -> Result<usize> {
466 let now = now.naive_utc();
467 let measure_id = *self
468 .usage_measure_ids
469 .get(&usage_measure)
470 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
471 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
472 return Ok(0);
473 };
474
475 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
476 Ok(live_buckets.iter().sum::<i64>() as _)
477 }
478
479 fn get_live_buckets(
480 usage: &usage::Model,
481 now: chrono::NaiveDateTime,
482 measure: UsageMeasure,
483 ) -> (&[i64], usize) {
484 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
485 let buckets_since_usage =
486 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
487 let buckets_since_usage = buckets_since_usage.ceil() as usize;
488 let mut live_buckets = &[] as &[i64];
489 if buckets_since_usage < measure.bucket_count() {
490 let expired_bucket_count =
491 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
492 live_buckets = &usage.buckets[expired_bucket_count..];
493 while live_buckets.first() == Some(&0) {
494 live_buckets = &live_buckets[1..];
495 }
496 }
497 (live_buckets, buckets_since_usage)
498 }
499}
500
501fn calculate_spending(
502 model: &model::Model,
503 input_tokens_this_month: usize,
504 output_tokens_this_month: usize,
505) -> usize {
506 let input_token_cost =
507 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
508 let output_token_cost =
509 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
510 input_token_cost + output_token_cost
511}
512
513const MINUTE_BUCKET_COUNT: usize = 12;
514const DAY_BUCKET_COUNT: usize = 48;
515const MONTH_BUCKET_COUNT: usize = 30;
516
517impl UsageMeasure {
518 fn bucket_count(&self) -> usize {
519 match self {
520 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
521 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
522 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
523 UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
524 UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
525 }
526 }
527
528 fn total_duration(&self) -> Duration {
529 match self {
530 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
531 UsageMeasure::TokensPerMinute => Duration::minutes(1),
532 UsageMeasure::TokensPerDay => Duration::hours(24),
533 UsageMeasure::InputTokensPerMonth => Duration::days(30),
534 UsageMeasure::OutputTokensPerMonth => Duration::days(30),
535 }
536 }
537
538 fn bucket_duration(&self) -> Duration {
539 self.total_duration() / self.bucket_count() as i32
540 }
541}