1use crate::db::UserId;
2use chrono::Duration;
3use futures::StreamExt as _;
4use rpc::LanguageModelProvider;
5use sea_orm::QuerySelect;
6use std::{iter, str::FromStr};
7use strum::IntoEnumIterator as _;
8
9use super::*;
10
11#[derive(Debug, PartialEq, Clone, Copy)]
12pub struct Usage {
13 pub requests_this_minute: usize,
14 pub tokens_this_minute: usize,
15 pub tokens_this_day: usize,
16 pub input_tokens_this_month: usize,
17 pub output_tokens_this_month: usize,
18 pub spending_this_month: usize,
19 pub lifetime_spending: usize,
20}
21
22#[derive(Debug, PartialEq, Clone)]
23pub struct ApplicationWideUsage {
24 pub provider: LanguageModelProvider,
25 pub model: String,
26 pub requests_this_minute: usize,
27 pub tokens_this_minute: usize,
28}
29
30#[derive(Clone, Copy, Debug, Default)]
31pub struct ActiveUserCount {
32 pub users_in_recent_minutes: usize,
33 pub users_in_recent_days: usize,
34}
35
36impl LlmDatabase {
37 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
38 let all_measures = self
39 .transaction(|tx| async move {
40 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
41
42 let new_measures = UsageMeasure::iter()
43 .filter(|measure| {
44 !existing_measures
45 .iter()
46 .any(|m| m.name == measure.to_string())
47 })
48 .map(|measure| usage_measure::ActiveModel {
49 name: ActiveValue::set(measure.to_string()),
50 ..Default::default()
51 })
52 .collect::<Vec<_>>();
53
54 if !new_measures.is_empty() {
55 usage_measure::Entity::insert_many(new_measures)
56 .exec(&*tx)
57 .await?;
58 }
59
60 Ok(usage_measure::Entity::find().all(&*tx).await?)
61 })
62 .await?;
63
64 self.usage_measure_ids = all_measures
65 .into_iter()
66 .filter_map(|measure| {
67 UsageMeasure::from_str(&measure.name)
68 .ok()
69 .map(|um| (um, measure.id))
70 })
71 .collect();
72 Ok(())
73 }
74
75 pub async fn get_application_wide_usages_by_model(
76 &self,
77 now: DateTimeUtc,
78 ) -> Result<Vec<ApplicationWideUsage>> {
79 self.transaction(|tx| async move {
80 let past_minute = now - Duration::minutes(1);
81 let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
82 let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
83
84 let mut results = Vec::new();
85 for (provider, model) in self.models.keys().cloned() {
86 let mut usages = usage::Entity::find()
87 .filter(
88 usage::Column::Timestamp
89 .gte(past_minute.naive_utc())
90 .and(usage::Column::IsStaff.eq(false))
91 .and(
92 usage::Column::MeasureId
93 .eq(requests_per_minute)
94 .or(usage::Column::MeasureId.eq(tokens_per_minute)),
95 ),
96 )
97 .stream(&*tx)
98 .await?;
99
100 let mut requests_this_minute = 0;
101 let mut tokens_this_minute = 0;
102 while let Some(usage) = usages.next().await {
103 let usage = usage?;
104 if usage.measure_id == requests_per_minute {
105 requests_this_minute += Self::get_live_buckets(
106 &usage,
107 now.naive_utc(),
108 UsageMeasure::RequestsPerMinute,
109 )
110 .0
111 .iter()
112 .copied()
113 .sum::<i64>() as usize;
114 } else if usage.measure_id == tokens_per_minute {
115 tokens_this_minute += Self::get_live_buckets(
116 &usage,
117 now.naive_utc(),
118 UsageMeasure::TokensPerMinute,
119 )
120 .0
121 .iter()
122 .copied()
123 .sum::<i64>() as usize;
124 }
125 }
126
127 results.push(ApplicationWideUsage {
128 provider,
129 model,
130 requests_this_minute,
131 tokens_this_minute,
132 })
133 }
134
135 Ok(results)
136 })
137 .await
138 }
139
140 pub async fn get_usage(
141 &self,
142 user_id: UserId,
143 provider: LanguageModelProvider,
144 model_name: &str,
145 now: DateTimeUtc,
146 ) -> Result<Usage> {
147 self.transaction(|tx| async move {
148 let model = self
149 .models
150 .get(&(provider, model_name.to_string()))
151 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
152
153 let usages = usage::Entity::find()
154 .filter(
155 usage::Column::UserId
156 .eq(user_id)
157 .and(usage::Column::ModelId.eq(model.id)),
158 )
159 .all(&*tx)
160 .await?;
161
162 let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
163 .filter(
164 lifetime_usage::Column::UserId
165 .eq(user_id)
166 .and(lifetime_usage::Column::ModelId.eq(model.id)),
167 )
168 .one(&*tx)
169 .await?
170 .map_or((0, 0), |usage| {
171 (usage.input_tokens as usize, usage.output_tokens as usize)
172 });
173
174 let requests_this_minute =
175 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
176 let tokens_this_minute =
177 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
178 let tokens_this_day =
179 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
180 let input_tokens_this_month =
181 self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
182 let output_tokens_this_month =
183 self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
184 let spending_this_month =
185 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
186 let lifetime_spending =
187 calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
188
189 Ok(Usage {
190 requests_this_minute,
191 tokens_this_minute,
192 tokens_this_day,
193 input_tokens_this_month,
194 output_tokens_this_month,
195 spending_this_month,
196 lifetime_spending,
197 })
198 })
199 .await
200 }
201
202 #[allow(clippy::too_many_arguments)]
203 pub async fn record_usage(
204 &self,
205 user_id: UserId,
206 is_staff: bool,
207 provider: LanguageModelProvider,
208 model_name: &str,
209 input_token_count: usize,
210 output_token_count: usize,
211 now: DateTimeUtc,
212 ) -> Result<Usage> {
213 self.transaction(|tx| async move {
214 let model = self.model(provider, model_name)?;
215
216 let usages = usage::Entity::find()
217 .filter(
218 usage::Column::UserId
219 .eq(user_id)
220 .and(usage::Column::ModelId.eq(model.id)),
221 )
222 .all(&*tx)
223 .await?;
224
225 let requests_this_minute = self
226 .update_usage_for_measure(
227 user_id,
228 is_staff,
229 model.id,
230 &usages,
231 UsageMeasure::RequestsPerMinute,
232 now,
233 1,
234 &tx,
235 )
236 .await?;
237 let tokens_this_minute = self
238 .update_usage_for_measure(
239 user_id,
240 is_staff,
241 model.id,
242 &usages,
243 UsageMeasure::TokensPerMinute,
244 now,
245 input_token_count + output_token_count,
246 &tx,
247 )
248 .await?;
249 let tokens_this_day = self
250 .update_usage_for_measure(
251 user_id,
252 is_staff,
253 model.id,
254 &usages,
255 UsageMeasure::TokensPerDay,
256 now,
257 input_token_count + output_token_count,
258 &tx,
259 )
260 .await?;
261 let input_tokens_this_month = self
262 .update_usage_for_measure(
263 user_id,
264 is_staff,
265 model.id,
266 &usages,
267 UsageMeasure::InputTokensPerMonth,
268 now,
269 input_token_count,
270 &tx,
271 )
272 .await?;
273 let output_tokens_this_month = self
274 .update_usage_for_measure(
275 user_id,
276 is_staff,
277 model.id,
278 &usages,
279 UsageMeasure::OutputTokensPerMonth,
280 now,
281 output_token_count,
282 &tx,
283 )
284 .await?;
285 let spending_this_month =
286 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
287
288 // Update lifetime usage
289 let lifetime_usage = lifetime_usage::Entity::find()
290 .filter(
291 lifetime_usage::Column::UserId
292 .eq(user_id)
293 .and(lifetime_usage::Column::ModelId.eq(model.id)),
294 )
295 .one(&*tx)
296 .await?;
297
298 let lifetime_usage = match lifetime_usage {
299 Some(usage) => {
300 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
301 id: ActiveValue::unchanged(usage.id),
302 input_tokens: ActiveValue::set(
303 usage.input_tokens + input_token_count as i64,
304 ),
305 output_tokens: ActiveValue::set(
306 usage.output_tokens + output_token_count as i64,
307 ),
308 ..Default::default()
309 })
310 .exec(&*tx)
311 .await?
312 }
313 None => {
314 lifetime_usage::ActiveModel {
315 user_id: ActiveValue::set(user_id),
316 model_id: ActiveValue::set(model.id),
317 input_tokens: ActiveValue::set(input_token_count as i64),
318 output_tokens: ActiveValue::set(output_token_count as i64),
319 ..Default::default()
320 }
321 .insert(&*tx)
322 .await?
323 }
324 };
325
326 let lifetime_spending = calculate_spending(
327 model,
328 lifetime_usage.input_tokens as usize,
329 lifetime_usage.output_tokens as usize,
330 );
331
332 Ok(Usage {
333 requests_this_minute,
334 tokens_this_minute,
335 tokens_this_day,
336 input_tokens_this_month,
337 output_tokens_this_month,
338 spending_this_month,
339 lifetime_spending,
340 })
341 })
342 .await
343 }
344
345 pub async fn get_active_user_count(&self, now: DateTimeUtc) -> Result<ActiveUserCount> {
346 self.transaction(|tx| async move {
347 let minute_since = now - Duration::minutes(5);
348 let day_since = now - Duration::days(5);
349
350 let users_in_recent_minutes = usage::Entity::find()
351 .filter(
352 usage::Column::Timestamp
353 .gte(minute_since.naive_utc())
354 .and(usage::Column::IsStaff.eq(false)),
355 )
356 .select_only()
357 .column(usage::Column::UserId)
358 .group_by(usage::Column::UserId)
359 .count(&*tx)
360 .await? as usize;
361
362 let users_in_recent_days = usage::Entity::find()
363 .filter(
364 usage::Column::Timestamp
365 .gte(day_since.naive_utc())
366 .and(usage::Column::IsStaff.eq(false)),
367 )
368 .select_only()
369 .column(usage::Column::UserId)
370 .group_by(usage::Column::UserId)
371 .count(&*tx)
372 .await? as usize;
373
374 Ok(ActiveUserCount {
375 users_in_recent_minutes,
376 users_in_recent_days,
377 })
378 })
379 .await
380 }
381
382 #[allow(clippy::too_many_arguments)]
383 async fn update_usage_for_measure(
384 &self,
385 user_id: UserId,
386 is_staff: bool,
387 model_id: ModelId,
388 usages: &[usage::Model],
389 usage_measure: UsageMeasure,
390 now: DateTimeUtc,
391 usage_to_add: usize,
392 tx: &DatabaseTransaction,
393 ) -> Result<usize> {
394 let now = now.naive_utc();
395 let measure_id = *self
396 .usage_measure_ids
397 .get(&usage_measure)
398 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
399
400 let mut id = None;
401 let mut timestamp = now;
402 let mut buckets = vec![0_i64];
403
404 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
405 id = Some(old_usage.id);
406 let (live_buckets, buckets_since) =
407 Self::get_live_buckets(old_usage, now, usage_measure);
408 if !live_buckets.is_empty() {
409 buckets.clear();
410 buckets.extend_from_slice(live_buckets);
411 buckets.extend(iter::repeat(0).take(buckets_since));
412 timestamp =
413 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
414 }
415 }
416
417 *buckets.last_mut().unwrap() += usage_to_add as i64;
418 let total_usage = buckets.iter().sum::<i64>() as usize;
419
420 let mut model = usage::ActiveModel {
421 user_id: ActiveValue::set(user_id),
422 is_staff: ActiveValue::set(is_staff),
423 model_id: ActiveValue::set(model_id),
424 measure_id: ActiveValue::set(measure_id),
425 timestamp: ActiveValue::set(timestamp),
426 buckets: ActiveValue::set(buckets),
427 ..Default::default()
428 };
429
430 if let Some(id) = id {
431 model.id = ActiveValue::unchanged(id);
432 model.update(tx).await?;
433 } else {
434 usage::Entity::insert(model)
435 .exec_without_returning(tx)
436 .await?;
437 }
438
439 Ok(total_usage)
440 }
441
442 fn get_usage_for_measure(
443 &self,
444 usages: &[usage::Model],
445 now: DateTimeUtc,
446 usage_measure: UsageMeasure,
447 ) -> Result<usize> {
448 let now = now.naive_utc();
449 let measure_id = *self
450 .usage_measure_ids
451 .get(&usage_measure)
452 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
453 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
454 return Ok(0);
455 };
456
457 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
458 Ok(live_buckets.iter().sum::<i64>() as _)
459 }
460
461 fn get_live_buckets(
462 usage: &usage::Model,
463 now: chrono::NaiveDateTime,
464 measure: UsageMeasure,
465 ) -> (&[i64], usize) {
466 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
467 let buckets_since_usage =
468 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
469 let buckets_since_usage = buckets_since_usage.ceil() as usize;
470 let mut live_buckets = &[] as &[i64];
471 if buckets_since_usage < measure.bucket_count() {
472 let expired_bucket_count =
473 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
474 live_buckets = &usage.buckets[expired_bucket_count..];
475 while live_buckets.first() == Some(&0) {
476 live_buckets = &live_buckets[1..];
477 }
478 }
479 (live_buckets, buckets_since_usage)
480 }
481}
482
483fn calculate_spending(
484 model: &model::Model,
485 input_tokens_this_month: usize,
486 output_tokens_this_month: usize,
487) -> usize {
488 let input_token_cost =
489 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
490 let output_token_cost =
491 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
492 input_token_cost + output_token_cost
493}
494
495const MINUTE_BUCKET_COUNT: usize = 12;
496const DAY_BUCKET_COUNT: usize = 48;
497const MONTH_BUCKET_COUNT: usize = 30;
498
499impl UsageMeasure {
500 fn bucket_count(&self) -> usize {
501 match self {
502 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
503 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
504 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
505 UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
506 UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
507 }
508 }
509
510 fn total_duration(&self) -> Duration {
511 match self {
512 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
513 UsageMeasure::TokensPerMinute => Duration::minutes(1),
514 UsageMeasure::TokensPerDay => Duration::hours(24),
515 UsageMeasure::InputTokensPerMonth => Duration::days(30),
516 UsageMeasure::OutputTokensPerMonth => Duration::days(30),
517 }
518 }
519
520 fn bucket_duration(&self) -> Duration {
521 self.total_duration() / self.bucket_count() as i32
522 }
523}