usages.rs

  1use crate::db::UserId;
  2use chrono::Duration;
  3use futures::StreamExt as _;
  4use rpc::LanguageModelProvider;
  5use sea_orm::QuerySelect;
  6use std::{iter, str::FromStr};
  7use strum::IntoEnumIterator as _;
  8
  9use super::*;
 10
 11#[derive(Debug, PartialEq, Clone, Copy)]
 12pub struct Usage {
 13    pub requests_this_minute: usize,
 14    pub tokens_this_minute: usize,
 15    pub tokens_this_day: usize,
 16    pub input_tokens_this_month: usize,
 17    pub output_tokens_this_month: usize,
 18    pub spending_this_month: usize,
 19    pub lifetime_spending: usize,
 20}
 21
 22#[derive(Debug, PartialEq, Clone)]
 23pub struct ApplicationWideUsage {
 24    pub provider: LanguageModelProvider,
 25    pub model: String,
 26    pub requests_this_minute: usize,
 27    pub tokens_this_minute: usize,
 28}
 29
 30#[derive(Clone, Copy, Debug, Default)]
 31pub struct ActiveUserCount {
 32    pub users_in_recent_minutes: usize,
 33    pub users_in_recent_days: usize,
 34}
 35
 36impl LlmDatabase {
 37    pub async fn initialize_usage_measures(&mut self) -> Result<()> {
 38        let all_measures = self
 39            .transaction(|tx| async move {
 40                let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
 41
 42                let new_measures = UsageMeasure::iter()
 43                    .filter(|measure| {
 44                        !existing_measures
 45                            .iter()
 46                            .any(|m| m.name == measure.to_string())
 47                    })
 48                    .map(|measure| usage_measure::ActiveModel {
 49                        name: ActiveValue::set(measure.to_string()),
 50                        ..Default::default()
 51                    })
 52                    .collect::<Vec<_>>();
 53
 54                if !new_measures.is_empty() {
 55                    usage_measure::Entity::insert_many(new_measures)
 56                        .exec(&*tx)
 57                        .await?;
 58                }
 59
 60                Ok(usage_measure::Entity::find().all(&*tx).await?)
 61            })
 62            .await?;
 63
 64        self.usage_measure_ids = all_measures
 65            .into_iter()
 66            .filter_map(|measure| {
 67                UsageMeasure::from_str(&measure.name)
 68                    .ok()
 69                    .map(|um| (um, measure.id))
 70            })
 71            .collect();
 72        Ok(())
 73    }
 74
 75    pub async fn get_application_wide_usages_by_model(
 76        &self,
 77        now: DateTimeUtc,
 78    ) -> Result<Vec<ApplicationWideUsage>> {
 79        self.transaction(|tx| async move {
 80            let past_minute = now - Duration::minutes(1);
 81            let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
 82            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
 83
 84            let mut results = Vec::new();
 85            for (provider, model) in self.models.keys().cloned() {
 86                let mut usages = usage::Entity::find()
 87                    .filter(
 88                        usage::Column::Timestamp
 89                            .gte(past_minute.naive_utc())
 90                            .and(usage::Column::IsStaff.eq(false))
 91                            .and(
 92                                usage::Column::MeasureId
 93                                    .eq(requests_per_minute)
 94                                    .or(usage::Column::MeasureId.eq(tokens_per_minute)),
 95                            ),
 96                    )
 97                    .stream(&*tx)
 98                    .await?;
 99
100                let mut requests_this_minute = 0;
101                let mut tokens_this_minute = 0;
102                while let Some(usage) = usages.next().await {
103                    let usage = usage?;
104                    if usage.measure_id == requests_per_minute {
105                        requests_this_minute += Self::get_live_buckets(
106                            &usage,
107                            now.naive_utc(),
108                            UsageMeasure::RequestsPerMinute,
109                        )
110                        .0
111                        .iter()
112                        .copied()
113                        .sum::<i64>() as usize;
114                    } else if usage.measure_id == tokens_per_minute {
115                        tokens_this_minute += Self::get_live_buckets(
116                            &usage,
117                            now.naive_utc(),
118                            UsageMeasure::TokensPerMinute,
119                        )
120                        .0
121                        .iter()
122                        .copied()
123                        .sum::<i64>() as usize;
124                    }
125                }
126
127                results.push(ApplicationWideUsage {
128                    provider,
129                    model,
130                    requests_this_minute,
131                    tokens_this_minute,
132                })
133            }
134
135            Ok(results)
136        })
137        .await
138    }
139
140    pub async fn get_usage(
141        &self,
142        user_id: UserId,
143        provider: LanguageModelProvider,
144        model_name: &str,
145        now: DateTimeUtc,
146    ) -> Result<Usage> {
147        self.transaction(|tx| async move {
148            let model = self
149                .models
150                .get(&(provider, model_name.to_string()))
151                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
152
153            let usages = usage::Entity::find()
154                .filter(
155                    usage::Column::UserId
156                        .eq(user_id)
157                        .and(usage::Column::ModelId.eq(model.id)),
158                )
159                .all(&*tx)
160                .await?;
161
162            let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
163                .filter(
164                    lifetime_usage::Column::UserId
165                        .eq(user_id)
166                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
167                )
168                .one(&*tx)
169                .await?
170                .map_or((0, 0), |usage| {
171                    (usage.input_tokens as usize, usage.output_tokens as usize)
172                });
173
174            let requests_this_minute =
175                self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
176            let tokens_this_minute =
177                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
178            let tokens_this_day =
179                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
180            let input_tokens_this_month =
181                self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
182            let output_tokens_this_month =
183                self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
184            let spending_this_month =
185                calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
186            let lifetime_spending =
187                calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
188
189            Ok(Usage {
190                requests_this_minute,
191                tokens_this_minute,
192                tokens_this_day,
193                input_tokens_this_month,
194                output_tokens_this_month,
195                spending_this_month,
196                lifetime_spending,
197            })
198        })
199        .await
200    }
201
202    #[allow(clippy::too_many_arguments)]
203    pub async fn record_usage(
204        &self,
205        user_id: UserId,
206        is_staff: bool,
207        provider: LanguageModelProvider,
208        model_name: &str,
209        input_token_count: usize,
210        output_token_count: usize,
211        now: DateTimeUtc,
212    ) -> Result<Usage> {
213        self.transaction(|tx| async move {
214            let model = self.model(provider, model_name)?;
215
216            let usages = usage::Entity::find()
217                .filter(
218                    usage::Column::UserId
219                        .eq(user_id)
220                        .and(usage::Column::ModelId.eq(model.id)),
221                )
222                .all(&*tx)
223                .await?;
224
225            let requests_this_minute = self
226                .update_usage_for_measure(
227                    user_id,
228                    is_staff,
229                    model.id,
230                    &usages,
231                    UsageMeasure::RequestsPerMinute,
232                    now,
233                    1,
234                    &tx,
235                )
236                .await?;
237            let tokens_this_minute = self
238                .update_usage_for_measure(
239                    user_id,
240                    is_staff,
241                    model.id,
242                    &usages,
243                    UsageMeasure::TokensPerMinute,
244                    now,
245                    input_token_count + output_token_count,
246                    &tx,
247                )
248                .await?;
249            let tokens_this_day = self
250                .update_usage_for_measure(
251                    user_id,
252                    is_staff,
253                    model.id,
254                    &usages,
255                    UsageMeasure::TokensPerDay,
256                    now,
257                    input_token_count + output_token_count,
258                    &tx,
259                )
260                .await?;
261            let input_tokens_this_month = self
262                .update_usage_for_measure(
263                    user_id,
264                    is_staff,
265                    model.id,
266                    &usages,
267                    UsageMeasure::InputTokensPerMonth,
268                    now,
269                    input_token_count,
270                    &tx,
271                )
272                .await?;
273            let output_tokens_this_month = self
274                .update_usage_for_measure(
275                    user_id,
276                    is_staff,
277                    model.id,
278                    &usages,
279                    UsageMeasure::OutputTokensPerMonth,
280                    now,
281                    output_token_count,
282                    &tx,
283                )
284                .await?;
285            let spending_this_month =
286                calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
287
288            // Update lifetime usage
289            let lifetime_usage = lifetime_usage::Entity::find()
290                .filter(
291                    lifetime_usage::Column::UserId
292                        .eq(user_id)
293                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
294                )
295                .one(&*tx)
296                .await?;
297
298            let lifetime_usage = match lifetime_usage {
299                Some(usage) => {
300                    lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
301                        id: ActiveValue::unchanged(usage.id),
302                        input_tokens: ActiveValue::set(
303                            usage.input_tokens + input_token_count as i64,
304                        ),
305                        output_tokens: ActiveValue::set(
306                            usage.output_tokens + output_token_count as i64,
307                        ),
308                        ..Default::default()
309                    })
310                    .exec(&*tx)
311                    .await?
312                }
313                None => {
314                    lifetime_usage::ActiveModel {
315                        user_id: ActiveValue::set(user_id),
316                        model_id: ActiveValue::set(model.id),
317                        input_tokens: ActiveValue::set(input_token_count as i64),
318                        output_tokens: ActiveValue::set(output_token_count as i64),
319                        ..Default::default()
320                    }
321                    .insert(&*tx)
322                    .await?
323                }
324            };
325
326            let lifetime_spending = calculate_spending(
327                model,
328                lifetime_usage.input_tokens as usize,
329                lifetime_usage.output_tokens as usize,
330            );
331
332            Ok(Usage {
333                requests_this_minute,
334                tokens_this_minute,
335                tokens_this_day,
336                input_tokens_this_month,
337                output_tokens_this_month,
338                spending_this_month,
339                lifetime_spending,
340            })
341        })
342        .await
343    }
344
345    pub async fn get_active_user_count(&self, now: DateTimeUtc) -> Result<ActiveUserCount> {
346        self.transaction(|tx| async move {
347            let minute_since = now - Duration::minutes(5);
348            let day_since = now - Duration::days(5);
349
350            let users_in_recent_minutes = usage::Entity::find()
351                .filter(
352                    usage::Column::Timestamp
353                        .gte(minute_since.naive_utc())
354                        .and(usage::Column::IsStaff.eq(false)),
355                )
356                .select_only()
357                .column(usage::Column::UserId)
358                .group_by(usage::Column::UserId)
359                .count(&*tx)
360                .await? as usize;
361
362            let users_in_recent_days = usage::Entity::find()
363                .filter(
364                    usage::Column::Timestamp
365                        .gte(day_since.naive_utc())
366                        .and(usage::Column::IsStaff.eq(false)),
367                )
368                .select_only()
369                .column(usage::Column::UserId)
370                .group_by(usage::Column::UserId)
371                .count(&*tx)
372                .await? as usize;
373
374            Ok(ActiveUserCount {
375                users_in_recent_minutes,
376                users_in_recent_days,
377            })
378        })
379        .await
380    }
381
382    #[allow(clippy::too_many_arguments)]
383    async fn update_usage_for_measure(
384        &self,
385        user_id: UserId,
386        is_staff: bool,
387        model_id: ModelId,
388        usages: &[usage::Model],
389        usage_measure: UsageMeasure,
390        now: DateTimeUtc,
391        usage_to_add: usize,
392        tx: &DatabaseTransaction,
393    ) -> Result<usize> {
394        let now = now.naive_utc();
395        let measure_id = *self
396            .usage_measure_ids
397            .get(&usage_measure)
398            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
399
400        let mut id = None;
401        let mut timestamp = now;
402        let mut buckets = vec![0_i64];
403
404        if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
405            id = Some(old_usage.id);
406            let (live_buckets, buckets_since) =
407                Self::get_live_buckets(old_usage, now, usage_measure);
408            if !live_buckets.is_empty() {
409                buckets.clear();
410                buckets.extend_from_slice(live_buckets);
411                buckets.extend(iter::repeat(0).take(buckets_since));
412                timestamp =
413                    old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
414            }
415        }
416
417        *buckets.last_mut().unwrap() += usage_to_add as i64;
418        let total_usage = buckets.iter().sum::<i64>() as usize;
419
420        let mut model = usage::ActiveModel {
421            user_id: ActiveValue::set(user_id),
422            is_staff: ActiveValue::set(is_staff),
423            model_id: ActiveValue::set(model_id),
424            measure_id: ActiveValue::set(measure_id),
425            timestamp: ActiveValue::set(timestamp),
426            buckets: ActiveValue::set(buckets),
427            ..Default::default()
428        };
429
430        if let Some(id) = id {
431            model.id = ActiveValue::unchanged(id);
432            model.update(tx).await?;
433        } else {
434            usage::Entity::insert(model)
435                .exec_without_returning(tx)
436                .await?;
437        }
438
439        Ok(total_usage)
440    }
441
442    fn get_usage_for_measure(
443        &self,
444        usages: &[usage::Model],
445        now: DateTimeUtc,
446        usage_measure: UsageMeasure,
447    ) -> Result<usize> {
448        let now = now.naive_utc();
449        let measure_id = *self
450            .usage_measure_ids
451            .get(&usage_measure)
452            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
453        let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
454            return Ok(0);
455        };
456
457        let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
458        Ok(live_buckets.iter().sum::<i64>() as _)
459    }
460
461    fn get_live_buckets(
462        usage: &usage::Model,
463        now: chrono::NaiveDateTime,
464        measure: UsageMeasure,
465    ) -> (&[i64], usize) {
466        let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
467        let buckets_since_usage =
468            seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
469        let buckets_since_usage = buckets_since_usage.ceil() as usize;
470        let mut live_buckets = &[] as &[i64];
471        if buckets_since_usage < measure.bucket_count() {
472            let expired_bucket_count =
473                (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
474            live_buckets = &usage.buckets[expired_bucket_count..];
475            while live_buckets.first() == Some(&0) {
476                live_buckets = &live_buckets[1..];
477            }
478        }
479        (live_buckets, buckets_since_usage)
480    }
481}
482
483fn calculate_spending(
484    model: &model::Model,
485    input_tokens_this_month: usize,
486    output_tokens_this_month: usize,
487) -> usize {
488    let input_token_cost =
489        input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
490    let output_token_cost =
491        output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
492    input_token_cost + output_token_cost
493}
494
495const MINUTE_BUCKET_COUNT: usize = 12;
496const DAY_BUCKET_COUNT: usize = 48;
497const MONTH_BUCKET_COUNT: usize = 30;
498
499impl UsageMeasure {
500    fn bucket_count(&self) -> usize {
501        match self {
502            UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
503            UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
504            UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
505            UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
506            UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
507        }
508    }
509
510    fn total_duration(&self) -> Duration {
511        match self {
512            UsageMeasure::RequestsPerMinute => Duration::minutes(1),
513            UsageMeasure::TokensPerMinute => Duration::minutes(1),
514            UsageMeasure::TokensPerDay => Duration::hours(24),
515            UsageMeasure::InputTokensPerMonth => Duration::days(30),
516            UsageMeasure::OutputTokensPerMonth => Duration::days(30),
517        }
518    }
519
520    fn bucket_duration(&self) -> Duration {
521        self.total_duration() / self.bucket_count() as i32
522    }
523}