usages.rs

  1use crate::db::UserId;
  2use chrono::Duration;
  3use futures::StreamExt as _;
  4use rpc::LanguageModelProvider;
  5use sea_orm::QuerySelect;
  6use std::{iter, str::FromStr};
  7use strum::IntoEnumIterator as _;
  8
  9use super::*;
 10
 11#[derive(Debug, PartialEq, Clone, Copy)]
 12pub struct Usage {
 13    pub requests_this_minute: usize,
 14    pub tokens_this_minute: usize,
 15    pub tokens_this_day: usize,
 16    pub input_tokens_this_month: usize,
 17    pub output_tokens_this_month: usize,
 18    pub spending_this_month: usize,
 19    pub lifetime_spending: usize,
 20}
 21
 22#[derive(Debug, PartialEq, Clone)]
 23pub struct ApplicationWideUsage {
 24    pub provider: LanguageModelProvider,
 25    pub model: String,
 26    pub requests_this_minute: usize,
 27    pub tokens_this_minute: usize,
 28}
 29
 30#[derive(Clone, Copy, Debug, Default)]
 31pub struct ActiveUserCount {
 32    pub users_in_recent_minutes: usize,
 33    pub users_in_recent_days: usize,
 34}
 35
 36impl LlmDatabase {
 37    pub async fn initialize_usage_measures(&mut self) -> Result<()> {
 38        let all_measures = self
 39            .transaction(|tx| async move {
 40                let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
 41
 42                let new_measures = UsageMeasure::iter()
 43                    .filter(|measure| {
 44                        !existing_measures
 45                            .iter()
 46                            .any(|m| m.name == measure.to_string())
 47                    })
 48                    .map(|measure| usage_measure::ActiveModel {
 49                        name: ActiveValue::set(measure.to_string()),
 50                        ..Default::default()
 51                    })
 52                    .collect::<Vec<_>>();
 53
 54                if !new_measures.is_empty() {
 55                    usage_measure::Entity::insert_many(new_measures)
 56                        .exec(&*tx)
 57                        .await?;
 58                }
 59
 60                Ok(usage_measure::Entity::find().all(&*tx).await?)
 61            })
 62            .await?;
 63
 64        self.usage_measure_ids = all_measures
 65            .into_iter()
 66            .filter_map(|measure| {
 67                UsageMeasure::from_str(&measure.name)
 68                    .ok()
 69                    .map(|um| (um, measure.id))
 70            })
 71            .collect();
 72        Ok(())
 73    }
 74
 75    pub async fn get_application_wide_usages_by_model(
 76        &self,
 77        now: DateTimeUtc,
 78    ) -> Result<Vec<ApplicationWideUsage>> {
 79        self.transaction(|tx| async move {
 80            let past_minute = now - Duration::minutes(1);
 81            let requests_per_minute = self.usage_measure_ids[&UsageMeasure::RequestsPerMinute];
 82            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
 83
 84            let mut results = Vec::new();
 85            for ((provider, model_name), model) in self.models.iter() {
 86                let mut usages = usage::Entity::find()
 87                    .filter(
 88                        usage::Column::Timestamp
 89                            .gte(past_minute.naive_utc())
 90                            .and(usage::Column::IsStaff.eq(false))
 91                            .and(usage::Column::ModelId.eq(model.id))
 92                            .and(
 93                                usage::Column::MeasureId
 94                                    .eq(requests_per_minute)
 95                                    .or(usage::Column::MeasureId.eq(tokens_per_minute)),
 96                            ),
 97                    )
 98                    .stream(&*tx)
 99                    .await?;
100
101                let mut requests_this_minute = 0;
102                let mut tokens_this_minute = 0;
103                while let Some(usage) = usages.next().await {
104                    let usage = usage?;
105                    if usage.measure_id == requests_per_minute {
106                        requests_this_minute += Self::get_live_buckets(
107                            &usage,
108                            now.naive_utc(),
109                            UsageMeasure::RequestsPerMinute,
110                        )
111                        .0
112                        .iter()
113                        .copied()
114                        .sum::<i64>() as usize;
115                    } else if usage.measure_id == tokens_per_minute {
116                        tokens_this_minute += Self::get_live_buckets(
117                            &usage,
118                            now.naive_utc(),
119                            UsageMeasure::TokensPerMinute,
120                        )
121                        .0
122                        .iter()
123                        .copied()
124                        .sum::<i64>() as usize;
125                    }
126                }
127
128                results.push(ApplicationWideUsage {
129                    provider: *provider,
130                    model: model_name.clone(),
131                    requests_this_minute,
132                    tokens_this_minute,
133                })
134            }
135
136            Ok(results)
137        })
138        .await
139    }
140
141    pub async fn get_usage(
142        &self,
143        user_id: UserId,
144        provider: LanguageModelProvider,
145        model_name: &str,
146        now: DateTimeUtc,
147    ) -> Result<Usage> {
148        self.transaction(|tx| async move {
149            let model = self
150                .models
151                .get(&(provider, model_name.to_string()))
152                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
153
154            let usages = usage::Entity::find()
155                .filter(
156                    usage::Column::UserId
157                        .eq(user_id)
158                        .and(usage::Column::ModelId.eq(model.id)),
159                )
160                .all(&*tx)
161                .await?;
162
163            let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
164                .filter(
165                    lifetime_usage::Column::UserId
166                        .eq(user_id)
167                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
168                )
169                .one(&*tx)
170                .await?
171                .map_or((0, 0), |usage| {
172                    (usage.input_tokens as usize, usage.output_tokens as usize)
173                });
174
175            let requests_this_minute =
176                self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
177            let tokens_this_minute =
178                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
179            let tokens_this_day =
180                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
181            let input_tokens_this_month =
182                self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
183            let output_tokens_this_month =
184                self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
185            let spending_this_month =
186                calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
187            let lifetime_spending =
188                calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
189
190            Ok(Usage {
191                requests_this_minute,
192                tokens_this_minute,
193                tokens_this_day,
194                input_tokens_this_month,
195                output_tokens_this_month,
196                spending_this_month,
197                lifetime_spending,
198            })
199        })
200        .await
201    }
202
203    #[allow(clippy::too_many_arguments)]
204    pub async fn record_usage(
205        &self,
206        user_id: UserId,
207        is_staff: bool,
208        provider: LanguageModelProvider,
209        model_name: &str,
210        input_token_count: usize,
211        output_token_count: usize,
212        now: DateTimeUtc,
213    ) -> Result<Usage> {
214        self.transaction(|tx| async move {
215            let model = self.model(provider, model_name)?;
216
217            let usages = usage::Entity::find()
218                .filter(
219                    usage::Column::UserId
220                        .eq(user_id)
221                        .and(usage::Column::ModelId.eq(model.id)),
222                )
223                .all(&*tx)
224                .await?;
225
226            let requests_this_minute = self
227                .update_usage_for_measure(
228                    user_id,
229                    is_staff,
230                    model.id,
231                    &usages,
232                    UsageMeasure::RequestsPerMinute,
233                    now,
234                    1,
235                    &tx,
236                )
237                .await?;
238            let tokens_this_minute = self
239                .update_usage_for_measure(
240                    user_id,
241                    is_staff,
242                    model.id,
243                    &usages,
244                    UsageMeasure::TokensPerMinute,
245                    now,
246                    input_token_count + output_token_count,
247                    &tx,
248                )
249                .await?;
250            let tokens_this_day = self
251                .update_usage_for_measure(
252                    user_id,
253                    is_staff,
254                    model.id,
255                    &usages,
256                    UsageMeasure::TokensPerDay,
257                    now,
258                    input_token_count + output_token_count,
259                    &tx,
260                )
261                .await?;
262            let input_tokens_this_month = self
263                .update_usage_for_measure(
264                    user_id,
265                    is_staff,
266                    model.id,
267                    &usages,
268                    UsageMeasure::InputTokensPerMonth,
269                    now,
270                    input_token_count,
271                    &tx,
272                )
273                .await?;
274            let output_tokens_this_month = self
275                .update_usage_for_measure(
276                    user_id,
277                    is_staff,
278                    model.id,
279                    &usages,
280                    UsageMeasure::OutputTokensPerMonth,
281                    now,
282                    output_token_count,
283                    &tx,
284                )
285                .await?;
286            let spending_this_month =
287                calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
288
289            // Update lifetime usage
290            let lifetime_usage = lifetime_usage::Entity::find()
291                .filter(
292                    lifetime_usage::Column::UserId
293                        .eq(user_id)
294                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
295                )
296                .one(&*tx)
297                .await?;
298
299            let lifetime_usage = match lifetime_usage {
300                Some(usage) => {
301                    lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
302                        id: ActiveValue::unchanged(usage.id),
303                        input_tokens: ActiveValue::set(
304                            usage.input_tokens + input_token_count as i64,
305                        ),
306                        output_tokens: ActiveValue::set(
307                            usage.output_tokens + output_token_count as i64,
308                        ),
309                        ..Default::default()
310                    })
311                    .exec(&*tx)
312                    .await?
313                }
314                None => {
315                    lifetime_usage::ActiveModel {
316                        user_id: ActiveValue::set(user_id),
317                        model_id: ActiveValue::set(model.id),
318                        input_tokens: ActiveValue::set(input_token_count as i64),
319                        output_tokens: ActiveValue::set(output_token_count as i64),
320                        ..Default::default()
321                    }
322                    .insert(&*tx)
323                    .await?
324                }
325            };
326
327            let lifetime_spending = calculate_spending(
328                model,
329                lifetime_usage.input_tokens as usize,
330                lifetime_usage.output_tokens as usize,
331            );
332
333            Ok(Usage {
334                requests_this_minute,
335                tokens_this_minute,
336                tokens_this_day,
337                input_tokens_this_month,
338                output_tokens_this_month,
339                spending_this_month,
340                lifetime_spending,
341            })
342        })
343        .await
344    }
345
346    /// Returns the active user count for the specified model.
347    pub async fn get_active_user_count(
348        &self,
349        provider: LanguageModelProvider,
350        model_name: &str,
351        now: DateTimeUtc,
352    ) -> Result<ActiveUserCount> {
353        self.transaction(|tx| async move {
354            let minute_since = now - Duration::minutes(5);
355            let day_since = now - Duration::days(5);
356
357            let model = self
358                .models
359                .get(&(provider, model_name.to_string()))
360                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
361
362            let tokens_per_minute = self.usage_measure_ids[&UsageMeasure::TokensPerMinute];
363
364            let users_in_recent_minutes = usage::Entity::find()
365                .filter(
366                    usage::Column::ModelId
367                        .eq(model.id)
368                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
369                        .and(usage::Column::Timestamp.gte(minute_since.naive_utc()))
370                        .and(usage::Column::IsStaff.eq(false)),
371                )
372                .select_only()
373                .column(usage::Column::UserId)
374                .group_by(usage::Column::UserId)
375                .count(&*tx)
376                .await? as usize;
377
378            let users_in_recent_days = usage::Entity::find()
379                .filter(
380                    usage::Column::ModelId
381                        .eq(model.id)
382                        .and(usage::Column::MeasureId.eq(tokens_per_minute))
383                        .and(usage::Column::Timestamp.gte(day_since.naive_utc()))
384                        .and(usage::Column::IsStaff.eq(false)),
385                )
386                .select_only()
387                .column(usage::Column::UserId)
388                .group_by(usage::Column::UserId)
389                .count(&*tx)
390                .await? as usize;
391
392            Ok(ActiveUserCount {
393                users_in_recent_minutes,
394                users_in_recent_days,
395            })
396        })
397        .await
398    }
399
400    #[allow(clippy::too_many_arguments)]
401    async fn update_usage_for_measure(
402        &self,
403        user_id: UserId,
404        is_staff: bool,
405        model_id: ModelId,
406        usages: &[usage::Model],
407        usage_measure: UsageMeasure,
408        now: DateTimeUtc,
409        usage_to_add: usize,
410        tx: &DatabaseTransaction,
411    ) -> Result<usize> {
412        let now = now.naive_utc();
413        let measure_id = *self
414            .usage_measure_ids
415            .get(&usage_measure)
416            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
417
418        let mut id = None;
419        let mut timestamp = now;
420        let mut buckets = vec![0_i64];
421
422        if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
423            id = Some(old_usage.id);
424            let (live_buckets, buckets_since) =
425                Self::get_live_buckets(old_usage, now, usage_measure);
426            if !live_buckets.is_empty() {
427                buckets.clear();
428                buckets.extend_from_slice(live_buckets);
429                buckets.extend(iter::repeat(0).take(buckets_since));
430                timestamp =
431                    old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
432            }
433        }
434
435        *buckets.last_mut().unwrap() += usage_to_add as i64;
436        let total_usage = buckets.iter().sum::<i64>() as usize;
437
438        let mut model = usage::ActiveModel {
439            user_id: ActiveValue::set(user_id),
440            is_staff: ActiveValue::set(is_staff),
441            model_id: ActiveValue::set(model_id),
442            measure_id: ActiveValue::set(measure_id),
443            timestamp: ActiveValue::set(timestamp),
444            buckets: ActiveValue::set(buckets),
445            ..Default::default()
446        };
447
448        if let Some(id) = id {
449            model.id = ActiveValue::unchanged(id);
450            model.update(tx).await?;
451        } else {
452            usage::Entity::insert(model)
453                .exec_without_returning(tx)
454                .await?;
455        }
456
457        Ok(total_usage)
458    }
459
460    fn get_usage_for_measure(
461        &self,
462        usages: &[usage::Model],
463        now: DateTimeUtc,
464        usage_measure: UsageMeasure,
465    ) -> Result<usize> {
466        let now = now.naive_utc();
467        let measure_id = *self
468            .usage_measure_ids
469            .get(&usage_measure)
470            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
471        let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
472            return Ok(0);
473        };
474
475        let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
476        Ok(live_buckets.iter().sum::<i64>() as _)
477    }
478
479    fn get_live_buckets(
480        usage: &usage::Model,
481        now: chrono::NaiveDateTime,
482        measure: UsageMeasure,
483    ) -> (&[i64], usize) {
484        let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
485        let buckets_since_usage =
486            seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
487        let buckets_since_usage = buckets_since_usage.ceil() as usize;
488        let mut live_buckets = &[] as &[i64];
489        if buckets_since_usage < measure.bucket_count() {
490            let expired_bucket_count =
491                (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
492            live_buckets = &usage.buckets[expired_bucket_count..];
493            while live_buckets.first() == Some(&0) {
494                live_buckets = &live_buckets[1..];
495            }
496        }
497        (live_buckets, buckets_since_usage)
498    }
499}
500
501fn calculate_spending(
502    model: &model::Model,
503    input_tokens_this_month: usize,
504    output_tokens_this_month: usize,
505) -> usize {
506    let input_token_cost =
507        input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
508    let output_token_cost =
509        output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
510    input_token_cost + output_token_cost
511}
512
513const MINUTE_BUCKET_COUNT: usize = 12;
514const DAY_BUCKET_COUNT: usize = 48;
515const MONTH_BUCKET_COUNT: usize = 30;
516
517impl UsageMeasure {
518    fn bucket_count(&self) -> usize {
519        match self {
520            UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
521            UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
522            UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
523            UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
524            UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
525        }
526    }
527
528    fn total_duration(&self) -> Duration {
529        match self {
530            UsageMeasure::RequestsPerMinute => Duration::minutes(1),
531            UsageMeasure::TokensPerMinute => Duration::minutes(1),
532            UsageMeasure::TokensPerDay => Duration::hours(24),
533            UsageMeasure::InputTokensPerMonth => Duration::days(30),
534            UsageMeasure::OutputTokensPerMonth => Duration::days(30),
535        }
536    }
537
538    fn bucket_duration(&self) -> Duration {
539        self.total_duration() / self.bucket_count() as i32
540    }
541}