1use crate::db::UserId;
2use chrono::Duration;
3use rpc::LanguageModelProvider;
4use sea_orm::QuerySelect;
5use std::{iter, str::FromStr};
6use strum::IntoEnumIterator as _;
7
8use super::*;
9
10#[derive(Debug, PartialEq, Clone, Copy)]
11pub struct Usage {
12 pub requests_this_minute: usize,
13 pub tokens_this_minute: usize,
14 pub tokens_this_day: usize,
15 pub input_tokens_this_month: usize,
16 pub output_tokens_this_month: usize,
17 pub spending_this_month: usize,
18 pub lifetime_spending: usize,
19}
20
21#[derive(Clone, Copy, Debug, Default)]
22pub struct ActiveUserCount {
23 pub users_in_recent_minutes: usize,
24 pub users_in_recent_days: usize,
25}
26
27impl LlmDatabase {
28 pub async fn initialize_usage_measures(&mut self) -> Result<()> {
29 let all_measures = self
30 .transaction(|tx| async move {
31 let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
32
33 let new_measures = UsageMeasure::iter()
34 .filter(|measure| {
35 !existing_measures
36 .iter()
37 .any(|m| m.name == measure.to_string())
38 })
39 .map(|measure| usage_measure::ActiveModel {
40 name: ActiveValue::set(measure.to_string()),
41 ..Default::default()
42 })
43 .collect::<Vec<_>>();
44
45 if !new_measures.is_empty() {
46 usage_measure::Entity::insert_many(new_measures)
47 .exec(&*tx)
48 .await?;
49 }
50
51 Ok(usage_measure::Entity::find().all(&*tx).await?)
52 })
53 .await?;
54
55 self.usage_measure_ids = all_measures
56 .into_iter()
57 .filter_map(|measure| {
58 UsageMeasure::from_str(&measure.name)
59 .ok()
60 .map(|um| (um, measure.id))
61 })
62 .collect();
63 Ok(())
64 }
65
66 pub async fn get_usage(
67 &self,
68 user_id: UserId,
69 provider: LanguageModelProvider,
70 model_name: &str,
71 now: DateTimeUtc,
72 ) -> Result<Usage> {
73 self.transaction(|tx| async move {
74 let model = self
75 .models
76 .get(&(provider, model_name.to_string()))
77 .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
78
79 let usages = usage::Entity::find()
80 .filter(
81 usage::Column::UserId
82 .eq(user_id)
83 .and(usage::Column::ModelId.eq(model.id)),
84 )
85 .all(&*tx)
86 .await?;
87
88 let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
89 .filter(
90 lifetime_usage::Column::UserId
91 .eq(user_id)
92 .and(lifetime_usage::Column::ModelId.eq(model.id)),
93 )
94 .one(&*tx)
95 .await?
96 .map_or((0, 0), |usage| {
97 (usage.input_tokens as usize, usage.output_tokens as usize)
98 });
99
100 let requests_this_minute =
101 self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
102 let tokens_this_minute =
103 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
104 let tokens_this_day =
105 self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
106 let input_tokens_this_month =
107 self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
108 let output_tokens_this_month =
109 self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
110 let spending_this_month =
111 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
112 let lifetime_spending =
113 calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
114
115 Ok(Usage {
116 requests_this_minute,
117 tokens_this_minute,
118 tokens_this_day,
119 input_tokens_this_month,
120 output_tokens_this_month,
121 spending_this_month,
122 lifetime_spending,
123 })
124 })
125 .await
126 }
127
128 #[allow(clippy::too_many_arguments)]
129 pub async fn record_usage(
130 &self,
131 user_id: UserId,
132 is_staff: bool,
133 provider: LanguageModelProvider,
134 model_name: &str,
135 input_token_count: usize,
136 output_token_count: usize,
137 now: DateTimeUtc,
138 ) -> Result<Usage> {
139 self.transaction(|tx| async move {
140 let model = self.model(provider, model_name)?;
141
142 let usages = usage::Entity::find()
143 .filter(
144 usage::Column::UserId
145 .eq(user_id)
146 .and(usage::Column::ModelId.eq(model.id)),
147 )
148 .all(&*tx)
149 .await?;
150
151 let requests_this_minute = self
152 .update_usage_for_measure(
153 user_id,
154 is_staff,
155 model.id,
156 &usages,
157 UsageMeasure::RequestsPerMinute,
158 now,
159 1,
160 &tx,
161 )
162 .await?;
163 let tokens_this_minute = self
164 .update_usage_for_measure(
165 user_id,
166 is_staff,
167 model.id,
168 &usages,
169 UsageMeasure::TokensPerMinute,
170 now,
171 input_token_count + output_token_count,
172 &tx,
173 )
174 .await?;
175 let tokens_this_day = self
176 .update_usage_for_measure(
177 user_id,
178 is_staff,
179 model.id,
180 &usages,
181 UsageMeasure::TokensPerDay,
182 now,
183 input_token_count + output_token_count,
184 &tx,
185 )
186 .await?;
187 let input_tokens_this_month = self
188 .update_usage_for_measure(
189 user_id,
190 is_staff,
191 model.id,
192 &usages,
193 UsageMeasure::InputTokensPerMonth,
194 now,
195 input_token_count,
196 &tx,
197 )
198 .await?;
199 let output_tokens_this_month = self
200 .update_usage_for_measure(
201 user_id,
202 is_staff,
203 model.id,
204 &usages,
205 UsageMeasure::OutputTokensPerMonth,
206 now,
207 output_token_count,
208 &tx,
209 )
210 .await?;
211 let spending_this_month =
212 calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
213
214 // Update lifetime usage
215 let lifetime_usage = lifetime_usage::Entity::find()
216 .filter(
217 lifetime_usage::Column::UserId
218 .eq(user_id)
219 .and(lifetime_usage::Column::ModelId.eq(model.id)),
220 )
221 .one(&*tx)
222 .await?;
223
224 let lifetime_usage = match lifetime_usage {
225 Some(usage) => {
226 lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
227 id: ActiveValue::unchanged(usage.id),
228 input_tokens: ActiveValue::set(
229 usage.input_tokens + input_token_count as i64,
230 ),
231 output_tokens: ActiveValue::set(
232 usage.output_tokens + output_token_count as i64,
233 ),
234 ..Default::default()
235 })
236 .exec(&*tx)
237 .await?
238 }
239 None => {
240 lifetime_usage::ActiveModel {
241 user_id: ActiveValue::set(user_id),
242 model_id: ActiveValue::set(model.id),
243 input_tokens: ActiveValue::set(input_token_count as i64),
244 output_tokens: ActiveValue::set(output_token_count as i64),
245 ..Default::default()
246 }
247 .insert(&*tx)
248 .await?
249 }
250 };
251
252 let lifetime_spending = calculate_spending(
253 model,
254 lifetime_usage.input_tokens as usize,
255 lifetime_usage.output_tokens as usize,
256 );
257
258 Ok(Usage {
259 requests_this_minute,
260 tokens_this_minute,
261 tokens_this_day,
262 input_tokens_this_month,
263 output_tokens_this_month,
264 spending_this_month,
265 lifetime_spending,
266 })
267 })
268 .await
269 }
270
271 pub async fn get_active_user_count(&self, now: DateTimeUtc) -> Result<ActiveUserCount> {
272 self.transaction(|tx| async move {
273 let minute_since = now - Duration::minutes(5);
274 let day_since = now - Duration::days(5);
275
276 let users_in_recent_minutes = usage::Entity::find()
277 .filter(
278 usage::Column::Timestamp
279 .gte(minute_since.naive_utc())
280 .and(usage::Column::IsStaff.eq(false)),
281 )
282 .select_only()
283 .column(usage::Column::UserId)
284 .group_by(usage::Column::UserId)
285 .count(&*tx)
286 .await? as usize;
287
288 let users_in_recent_days = usage::Entity::find()
289 .filter(
290 usage::Column::Timestamp
291 .gte(day_since.naive_utc())
292 .and(usage::Column::IsStaff.eq(false)),
293 )
294 .select_only()
295 .column(usage::Column::UserId)
296 .group_by(usage::Column::UserId)
297 .count(&*tx)
298 .await? as usize;
299
300 Ok(ActiveUserCount {
301 users_in_recent_minutes,
302 users_in_recent_days,
303 })
304 })
305 .await
306 }
307
308 #[allow(clippy::too_many_arguments)]
309 async fn update_usage_for_measure(
310 &self,
311 user_id: UserId,
312 is_staff: bool,
313 model_id: ModelId,
314 usages: &[usage::Model],
315 usage_measure: UsageMeasure,
316 now: DateTimeUtc,
317 usage_to_add: usize,
318 tx: &DatabaseTransaction,
319 ) -> Result<usize> {
320 let now = now.naive_utc();
321 let measure_id = *self
322 .usage_measure_ids
323 .get(&usage_measure)
324 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
325
326 let mut id = None;
327 let mut timestamp = now;
328 let mut buckets = vec![0_i64];
329
330 if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
331 id = Some(old_usage.id);
332 let (live_buckets, buckets_since) =
333 Self::get_live_buckets(old_usage, now, usage_measure);
334 if !live_buckets.is_empty() {
335 buckets.clear();
336 buckets.extend_from_slice(live_buckets);
337 buckets.extend(iter::repeat(0).take(buckets_since));
338 timestamp =
339 old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
340 }
341 }
342
343 *buckets.last_mut().unwrap() += usage_to_add as i64;
344 let total_usage = buckets.iter().sum::<i64>() as usize;
345
346 let mut model = usage::ActiveModel {
347 user_id: ActiveValue::set(user_id),
348 is_staff: ActiveValue::set(is_staff),
349 model_id: ActiveValue::set(model_id),
350 measure_id: ActiveValue::set(measure_id),
351 timestamp: ActiveValue::set(timestamp),
352 buckets: ActiveValue::set(buckets),
353 ..Default::default()
354 };
355
356 if let Some(id) = id {
357 model.id = ActiveValue::unchanged(id);
358 model.update(tx).await?;
359 } else {
360 usage::Entity::insert(model)
361 .exec_without_returning(tx)
362 .await?;
363 }
364
365 Ok(total_usage)
366 }
367
368 fn get_usage_for_measure(
369 &self,
370 usages: &[usage::Model],
371 now: DateTimeUtc,
372 usage_measure: UsageMeasure,
373 ) -> Result<usize> {
374 let now = now.naive_utc();
375 let measure_id = *self
376 .usage_measure_ids
377 .get(&usage_measure)
378 .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
379 let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
380 return Ok(0);
381 };
382
383 let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
384 Ok(live_buckets.iter().sum::<i64>() as _)
385 }
386
387 fn get_live_buckets(
388 usage: &usage::Model,
389 now: chrono::NaiveDateTime,
390 measure: UsageMeasure,
391 ) -> (&[i64], usize) {
392 let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
393 let buckets_since_usage =
394 seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
395 let buckets_since_usage = buckets_since_usage.ceil() as usize;
396 let mut live_buckets = &[] as &[i64];
397 if buckets_since_usage < measure.bucket_count() {
398 let expired_bucket_count =
399 (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
400 live_buckets = &usage.buckets[expired_bucket_count..];
401 while live_buckets.first() == Some(&0) {
402 live_buckets = &live_buckets[1..];
403 }
404 }
405 (live_buckets, buckets_since_usage)
406 }
407}
408
409fn calculate_spending(
410 model: &model::Model,
411 input_tokens_this_month: usize,
412 output_tokens_this_month: usize,
413) -> usize {
414 let input_token_cost =
415 input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
416 let output_token_cost =
417 output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
418 input_token_cost + output_token_cost
419}
420
421const MINUTE_BUCKET_COUNT: usize = 12;
422const DAY_BUCKET_COUNT: usize = 48;
423const MONTH_BUCKET_COUNT: usize = 30;
424
425impl UsageMeasure {
426 fn bucket_count(&self) -> usize {
427 match self {
428 UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
429 UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
430 UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
431 UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
432 UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
433 }
434 }
435
436 fn total_duration(&self) -> Duration {
437 match self {
438 UsageMeasure::RequestsPerMinute => Duration::minutes(1),
439 UsageMeasure::TokensPerMinute => Duration::minutes(1),
440 UsageMeasure::TokensPerDay => Duration::hours(24),
441 UsageMeasure::InputTokensPerMonth => Duration::days(30),
442 UsageMeasure::OutputTokensPerMonth => Duration::days(30),
443 }
444 }
445
446 fn bucket_duration(&self) -> Duration {
447 self.total_duration() / self.bucket_count() as i32
448 }
449}