1use crate::db::UserId;
2use chrono::Duration;
3use futures::StreamExt as _;
4use rpc::LanguageModelProvider;
5use sea_orm::QuerySelect;
6use std::{iter, str::FromStr};
7use strum::IntoEnumIterator as _;
8
9use super::*;
10
11#[derive(Debug, PartialEq, Clone, Copy)]
12pub struct Usage {
13 pub requests_this_minute: usize,
14 pub tokens_this_minute: usize,
15 pub tokens_this_day: usize,
16 pub input_tokens_this_month: usize,
17 pub output_tokens_this_month: usize,
18 pub spending_this_month: usize,
19 pub lifetime_spending: usize,
20}
21
22#[derive(Debug, PartialEq, Clone)]
23pub struct ApplicationWideUsage {
24 pub provider: LanguageModelProvider,
25 pub model: String,
26 pub requests_this_minute: usize,
27 pub tokens_this_minute: usize,
28}
29
30#[derive(Clone, Copy, Debug, Default)]
31pub struct ActiveUserCount {
32 pub users_in_recent_minutes: usize,
33 pub users_in_recent_days: usize,
34}
35
36impl LlmDatabase {
37 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
38 let all_measures = self
39 .transaction(|tx| async move {
40 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
41
42 let new_measures = UsageMeasure::iter()
43 .filter(|measure| {
44 !existing_measures
45 .iter()
46 .any(|m| m.name == measure.to_string())
47 })
48 .map(|measure| usage_measure::ActiveModel {
49 name: ActiveValue::set(measure.to_string()),
50 ..Default::default()
51 })
52 .collect::<Vec<_>>();
53
54 if !new_measures.is_empty() {
55 usage_measure::Entity::insert_many(new_measures)
56 .exec(&*tx)
57 .await?;
58 }
59
60 Ok(usage_measure::Entity::find().all(&*tx).await?)
61 })
62 .await?;
63
64 self.usage_measure_ids = all_measures
65 .into_iter()
66 .filter_map(|measure| {
67 UsageMeasure::from_str(&measure.name)
68 .ok()
69 .map(|um| (um, measure.id))
70 })
71 .collect();
72 Ok(())
73 }
74
75 pub async fn get_application_wide_usages_by_model(
76 &self,
77 now: DateTimeUtc,
78 ) -> Result<Vec<ApplicationWideUsage>> {
79 self.transaction(|tx| async move {
80 let past_minute = now - Duration::minutes(1);
81 let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
82 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
83
84 let mut results = Vec::new();
85 for ((provider, model_name), model) in self.models.iter() {
86 let mut usages = usage::Entity::find()
87 .filter(
88 usage::Column::Timestamp
89 .gte(past_minute.naive_utc())
90 .and(usage::Column::IsStaff.eq(false))
91 .and(usage::Column::ModelId.eq(model.id))
92 .and(
93 usage::Column::MeasureId
94 .eq(requests_per_minute)
95 .or(usage::Column::MeasureId.eq(tokens_per_minute)),
96 ),
97 )
98 .stream(&*tx)
99 .await?;
100
101 let mut requests_this_minute = 0;
102 let mut tokens_this_minute = 0;
103 while let Some(usage) = usages.next().await {
104 let usage = usage?;
105 if usage.measure_id == requests_per_minute {
106 requests_this_minute += Self::get_live_buckets(
107 &usage,
108 now.naive_utc(),
109 UsageMeasure::RequestsPerMinute,
110 )
111 .0
112 .iter()
113 .copied()
114 .sum::<i64>() as usize;
115 } else if usage.measure_id == tokens_per_minute {
116 tokens_this_minute += Self::get_live_buckets(
117 &usage,
118 now.naive_utc(),
119 UsageMeasure::TokensPerMinute,
120 )
121 .0
122 .iter()
123 .copied()
124 .sum::<i64>() as usize;
125 }
126 }
127
128 results.push(ApplicationWideUsage {
129 provider: *provider,
130 model: model_name.clone(),
131 requests_this_minute,
132 tokens_this_minute,
133 })
134 }
135
136 Ok(results)
137 })
138 .await
139 }
140
141 pub async fn get_usage(
142 &self,
143 user_id: UserId,
144 provider: LanguageModelProvider,
145 model_name: &str,
146 now: DateTimeUtc,
147 ) -> Result<Usage> {
148 self.transaction(|tx| async move {
149 let model = self
150 .models
151 .get(&(provider, model_name.to_string()))
152 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
153
154 let usages = usage::Entity::find()
155 .filter(
156 usage::Column::UserId
157 .eq(user_id)
158 .and(usage::Column::ModelId.eq(model.id)),
159 )
160 .all(&*tx)
161 .await?;
162
163 let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
164 .filter(
165 lifetime_usage::Column::UserId
166 .eq(user_id)
167 .and(lifetime_usage::Column::ModelId.eq(model.id)),
168 )
169 .one(&*tx)
170 .await?
171 .map_or((0, 0), |usage| {
172 (usage.input_tokens as usize, usage.output_tokens as usize)
173 });
174
175 let requests_this_minute =
176 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
177 let tokens_this_minute =
178 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
179 let tokens_this_day =
180 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
181 let input_tokens_this_month =
182 self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
183 let output_tokens_this_month =
184 self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
185 let spending_this_month =
186 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
187 let lifetime_spending =
188 calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
189
190 Ok(Usage {
191 requests_this_minute,
192 tokens_this_minute,
193 tokens_this_day,
194 input_tokens_this_month,
195 output_tokens_this_month,
196 spending_this_month,
197 lifetime_spending,
198 })
199 })
200 .await
201 }
202
203 #[allow(clippy::too_many_arguments)]
204 pub async fn record_usage(
205 &self,
206 user_id: UserId,
207 is_staff: bool,
208 provider: LanguageModelProvider,
209 model_name: &str,
210 input_token_count: usize,
211 output_token_count: usize,
212 now: DateTimeUtc,
213 ) -> Result<Usage> {
214 self.transaction(|tx| async move {
215 let model = self.model(provider, model_name)?;
216
217 let usages = usage::Entity::find()
218 .filter(
219 usage::Column::UserId
220 .eq(user_id)
221 .and(usage::Column::ModelId.eq(model.id)),
222 )
223 .all(&*tx)
224 .await?;
225
226 let requests_this_minute = self
227 .update_usage_for_measure(
228 user_id,
229 is_staff,
230 model.id,
231 &usages,
232 UsageMeasure::RequestsPerMinute,
233 now,
234 1,
235 &tx,
236 )
237 .await?;
238 let tokens_this_minute = self
239 .update_usage_for_measure(
240 user_id,
241 is_staff,
242 model.id,
243 &usages,
244 UsageMeasure::TokensPerMinute,
245 now,
246 input_token_count + output_token_count,
247 &tx,
248 )
249 .await?;
250 let tokens_this_day = self
251 .update_usage_for_measure(
252 user_id,
253 is_staff,
254 model.id,
255 &usages,
256 UsageMeasure::TokensPerDay,
257 now,
258 input_token_count + output_token_count,
259 &tx,
260 )
261 .await?;
262 let input_tokens_this_month = self
263 .update_usage_for_measure(
264 user_id,
265 is_staff,
266 model.id,
267 &usages,
268 UsageMeasure::InputTokensPerMonth,
269 now,
270 input_token_count,
271 &tx,
272 )
273 .await?;
274 let output_tokens_this_month = self
275 .update_usage_for_measure(
276 user_id,
277 is_staff,
278 model.id,
279 &usages,
280 UsageMeasure::OutputTokensPerMonth,
281 now,
282 output_token_count,
283 &tx,
284 )
285 .await?;
286 let spending_this_month =
287 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
288
289 // Update lifetime usage
290 let lifetime_usage = lifetime_usage::Entity::find()
291 .filter(
292 lifetime_usage::Column::UserId
293 .eq(user_id)
294 .and(lifetime_usage::Column::ModelId.eq(model.id)),
295 )
296 .one(&*tx)
297 .await?;
298
299 let lifetime_usage = match lifetime_usage {
300 Some(usage) => {
301 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
302 id: ActiveValue::unchanged(usage.id),
303 input_tokens: ActiveValue::set(
304 usage.input_tokens + input_token_count as i64,
305 ),
306 output_tokens: ActiveValue::set(
307 usage.output_tokens + output_token_count as i64,
308 ),
309 ..Default::default()
310 })
311 .exec(&*tx)
312 .await?
313 }
314 None => {
315 lifetime_usage::ActiveModel {
316 user_id: ActiveValue::set(user_id),
317 model_id: ActiveValue::set(model.id),
318 input_tokens: ActiveValue::set(input_token_count as i64),
319 output_tokens: ActiveValue::set(output_token_count as i64),
320 ..Default::default()
321 }
322 .insert(&*tx)
323 .await?
324 }
325 };
326
327 let lifetime_spending = calculate_spending(
328 model,
329 lifetime_usage.input_tokens as usize,
330 lifetime_usage.output_tokens as usize,
331 );
332
333 Ok(Usage {
334 requests_this_minute,
335 tokens_this_minute,
336 tokens_this_day,
337 input_tokens_this_month,
338 output_tokens_this_month,
339 spending_this_month,
340 lifetime_spending,
341 })
342 })
343 .await
344 }
345
346 pub async fn get_active_user_count(&self, now: DateTimeUtc) -> Result<ActiveUserCount> {
347 self.transaction(|tx| async move {
348 let minute_since = now - Duration::minutes(5);
349 let day_since = now - Duration::days(5);
350
351 let users_in_recent_minutes = usage::Entity::find()
352 .filter(
353 usage::Column::Timestamp
354 .gte(minute_since.naive_utc())
355 .and(usage::Column::IsStaff.eq(false)),
356 )
357 .select_only()
358 .column(usage::Column::UserId)
359 .group_by(usage::Column::UserId)
360 .count(&*tx)
361 .await? as usize;
362
363 let users_in_recent_days = usage::Entity::find()
364 .filter(
365 usage::Column::Timestamp
366 .gte(day_since.naive_utc())
367 .and(usage::Column::IsStaff.eq(false)),
368 )
369 .select_only()
370 .column(usage::Column::UserId)
371 .group_by(usage::Column::UserId)
372 .count(&*tx)
373 .await? as usize;
374
375 Ok(ActiveUserCount {
376 users_in_recent_minutes,
377 users_in_recent_days,
378 })
379 })
380 .await
381 }
382
383 #[allow(clippy::too_many_arguments)]
384 async fn update_usage_for_measure(
385 &self,
386 user_id: UserId,
387 is_staff: bool,
388 model_id: ModelId,
389 usages: &[usage::Model],
390 usage_measure: UsageMeasure,
391 now: DateTimeUtc,
392 usage_to_add: usize,
393 tx: &DatabaseTransaction,
394 ) -> Result<usize> {
395 let now = now.naive_utc();
396 let measure_id = *self
397 .usage_measure_ids
398 .get(&usage_measure)
399 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
400
401 let mut id = None;
402 let mut timestamp = now;
403 let mut buckets = vec![0_i64];
404
405 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
406 id = Some(old_usage.id);
407 let (live_buckets, buckets_since) =
408 Self::get_live_buckets(old_usage, now, usage_measure);
409 if !live_buckets.is_empty() {
410 buckets.clear();
411 buckets.extend_from_slice(live_buckets);
412 buckets.extend(iter::repeat(0).take(buckets_since));
413 timestamp =
414 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
415 }
416 }
417
418 *buckets.last_mut().unwrap() += usage_to_add as i64;
419 let total_usage = buckets.iter().sum::<i64>() as usize;
420
421 let mut model = usage::ActiveModel {
422 user_id: ActiveValue::set(user_id),
423 is_staff: ActiveValue::set(is_staff),
424 model_id: ActiveValue::set(model_id),
425 measure_id: ActiveValue::set(measure_id),
426 timestamp: ActiveValue::set(timestamp),
427 buckets: ActiveValue::set(buckets),
428 ..Default::default()
429 };
430
431 if let Some(id) = id {
432 model.id = ActiveValue::unchanged(id);
433 model.update(tx).await?;
434 } else {
435 usage::Entity::insert(model)
436 .exec_without_returning(tx)
437 .await?;
438 }
439
440 Ok(total_usage)
441 }
442
443 fn get_usage_for_measure(
444 &self,
445 usages: &[usage::Model],
446 now: DateTimeUtc,
447 usage_measure: UsageMeasure,
448 ) -> Result<usize> {
449 let now = now.naive_utc();
450 let measure_id = *self
451 .usage_measure_ids
452 .get(&usage_measure)
453 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
454 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
455 return Ok(0);
456 };
457
458 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
459 Ok(live_buckets.iter().sum::<i64>() as _)
460 }
461
462 fn get_live_buckets(
463 usage: &usage::Model,
464 now: chrono::NaiveDateTime,
465 measure: UsageMeasure,
466 ) -> (&[i64], usize) {
467 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
468 let buckets_since_usage =
469 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
470 let buckets_since_usage = buckets_since_usage.ceil() as usize;
471 let mut live_buckets = &[] as &[i64];
472 if buckets_since_usage < measure.bucket_count() {
473 let expired_bucket_count =
474 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
475 live_buckets = &usage.buckets[expired_bucket_count..];
476 while live_buckets.first() == Some(&0) {
477 live_buckets = &live_buckets[1..];
478 }
479 }
480 (live_buckets, buckets_since_usage)
481 }
482}
483
484fn calculate_spending(
485 model: &model::Model,
486 input_tokens_this_month: usize,
487 output_tokens_this_month: usize,
488) -> usize {
489 let input_token_cost =
490 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
491 let output_token_cost =
492 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
493 input_token_cost + output_token_cost
494}
495
496const MINUTE_BUCKET_COUNT: usize = 12;
497const DAY_BUCKET_COUNT: usize = 48;
498const MONTH_BUCKET_COUNT: usize = 30;
499
500impl UsageMeasure {
501 fn bucket_count(&self) -> usize {
502 match self {
503 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
504 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
505 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
506 UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
507 UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
508 }
509 }
510
511 fn total_duration(&self) -> Duration {
512 match self {
513 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
514 UsageMeasure::TokensPerMinute => Duration::minutes(1),
515 UsageMeasure::TokensPerDay => Duration::hours(24),
516 UsageMeasure::InputTokensPerMonth => Duration::days(30),
517 UsageMeasure::OutputTokensPerMonth => Duration::days(30),
518 }
519 }
520
521 fn bucket_duration(&self) -> Duration {
522 self.total_duration() / self.bucket_count() as i32
523 }
524}