usages.rs

  1use crate::db::UserId;
  2use chrono::Duration;
  3use rpc::LanguageModelProvider;
  4use sea_orm::QuerySelect;
  5use std::{iter, str::FromStr};
  6use strum::IntoEnumIterator as _;
  7
  8use super::*;
  9
 10#[derive(Debug, PartialEq, Clone, Copy)]
 11pub struct Usage {
 12    pub requests_this_minute: usize,
 13    pub tokens_this_minute: usize,
 14    pub tokens_this_day: usize,
 15    pub input_tokens_this_month: usize,
 16    pub output_tokens_this_month: usize,
 17    pub spending_this_month: usize,
 18    pub lifetime_spending: usize,
 19}
 20
 21#[derive(Clone, Copy, Debug, Default)]
 22pub struct ActiveUserCount {
 23    pub users_in_recent_minutes: usize,
 24    pub users_in_recent_days: usize,
 25}
 26
 27impl LlmDatabase {
 28    pub async fn initialize_usage_measures(&mut self) -> Result<()> {
 29        let all_measures = self
 30            .transaction(|tx| async move {
 31                let existing_measures = usage_measure::Entity::find().all(&*tx).await?;
 32
 33                let new_measures = UsageMeasure::iter()
 34                    .filter(|measure| {
 35                        !existing_measures
 36                            .iter()
 37                            .any(|m| m.name == measure.to_string())
 38                    })
 39                    .map(|measure| usage_measure::ActiveModel {
 40                        name: ActiveValue::set(measure.to_string()),
 41                        ..Default::default()
 42                    })
 43                    .collect::<Vec<_>>();
 44
 45                if !new_measures.is_empty() {
 46                    usage_measure::Entity::insert_many(new_measures)
 47                        .exec(&*tx)
 48                        .await?;
 49                }
 50
 51                Ok(usage_measure::Entity::find().all(&*tx).await?)
 52            })
 53            .await?;
 54
 55        self.usage_measure_ids = all_measures
 56            .into_iter()
 57            .filter_map(|measure| {
 58                UsageMeasure::from_str(&measure.name)
 59                    .ok()
 60                    .map(|um| (um, measure.id))
 61            })
 62            .collect();
 63        Ok(())
 64    }
 65
 66    pub async fn get_usage(
 67        &self,
 68        user_id: UserId,
 69        provider: LanguageModelProvider,
 70        model_name: &str,
 71        now: DateTimeUtc,
 72    ) -> Result<Usage> {
 73        self.transaction(|tx| async move {
 74            let model = self
 75                .models
 76                .get(&(provider, model_name.to_string()))
 77                .ok_or_else(|| anyhow!("unknown model {provider}:{model_name}"))?;
 78
 79            let usages = usage::Entity::find()
 80                .filter(
 81                    usage::Column::UserId
 82                        .eq(user_id)
 83                        .and(usage::Column::ModelId.eq(model.id)),
 84                )
 85                .all(&*tx)
 86                .await?;
 87
 88            let (lifetime_input_tokens, lifetime_output_tokens) = lifetime_usage::Entity::find()
 89                .filter(
 90                    lifetime_usage::Column::UserId
 91                        .eq(user_id)
 92                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
 93                )
 94                .one(&*tx)
 95                .await?
 96                .map_or((0, 0), |usage| {
 97                    (usage.input_tokens as usize, usage.output_tokens as usize)
 98                });
 99
100            let requests_this_minute =
101                self.get_usage_for_measure(&usages, now, UsageMeasure::RequestsPerMinute)?;
102            let tokens_this_minute =
103                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerMinute)?;
104            let tokens_this_day =
105                self.get_usage_for_measure(&usages, now, UsageMeasure::TokensPerDay)?;
106            let input_tokens_this_month =
107                self.get_usage_for_measure(&usages, now, UsageMeasure::InputTokensPerMonth)?;
108            let output_tokens_this_month =
109                self.get_usage_for_measure(&usages, now, UsageMeasure::OutputTokensPerMonth)?;
110            let spending_this_month =
111                calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
112            let lifetime_spending =
113                calculate_spending(model, lifetime_input_tokens, lifetime_output_tokens);
114
115            Ok(Usage {
116                requests_this_minute,
117                tokens_this_minute,
118                tokens_this_day,
119                input_tokens_this_month,
120                output_tokens_this_month,
121                spending_this_month,
122                lifetime_spending,
123            })
124        })
125        .await
126    }
127
128    #[allow(clippy::too_many_arguments)]
129    pub async fn record_usage(
130        &self,
131        user_id: UserId,
132        is_staff: bool,
133        provider: LanguageModelProvider,
134        model_name: &str,
135        input_token_count: usize,
136        output_token_count: usize,
137        now: DateTimeUtc,
138    ) -> Result<Usage> {
139        self.transaction(|tx| async move {
140            let model = self.model(provider, model_name)?;
141
142            let usages = usage::Entity::find()
143                .filter(
144                    usage::Column::UserId
145                        .eq(user_id)
146                        .and(usage::Column::ModelId.eq(model.id)),
147                )
148                .all(&*tx)
149                .await?;
150
151            let requests_this_minute = self
152                .update_usage_for_measure(
153                    user_id,
154                    is_staff,
155                    model.id,
156                    &usages,
157                    UsageMeasure::RequestsPerMinute,
158                    now,
159                    1,
160                    &tx,
161                )
162                .await?;
163            let tokens_this_minute = self
164                .update_usage_for_measure(
165                    user_id,
166                    is_staff,
167                    model.id,
168                    &usages,
169                    UsageMeasure::TokensPerMinute,
170                    now,
171                    input_token_count + output_token_count,
172                    &tx,
173                )
174                .await?;
175            let tokens_this_day = self
176                .update_usage_for_measure(
177                    user_id,
178                    is_staff,
179                    model.id,
180                    &usages,
181                    UsageMeasure::TokensPerDay,
182                    now,
183                    input_token_count + output_token_count,
184                    &tx,
185                )
186                .await?;
187            let input_tokens_this_month = self
188                .update_usage_for_measure(
189                    user_id,
190                    is_staff,
191                    model.id,
192                    &usages,
193                    UsageMeasure::InputTokensPerMonth,
194                    now,
195                    input_token_count,
196                    &tx,
197                )
198                .await?;
199            let output_tokens_this_month = self
200                .update_usage_for_measure(
201                    user_id,
202                    is_staff,
203                    model.id,
204                    &usages,
205                    UsageMeasure::OutputTokensPerMonth,
206                    now,
207                    output_token_count,
208                    &tx,
209                )
210                .await?;
211            let spending_this_month =
212                calculate_spending(model, input_tokens_this_month, output_tokens_this_month);
213
214            // Update lifetime usage
215            let lifetime_usage = lifetime_usage::Entity::find()
216                .filter(
217                    lifetime_usage::Column::UserId
218                        .eq(user_id)
219                        .and(lifetime_usage::Column::ModelId.eq(model.id)),
220                )
221                .one(&*tx)
222                .await?;
223
224            let lifetime_usage = match lifetime_usage {
225                Some(usage) => {
226                    lifetime_usage::Entity::update(lifetime_usage::ActiveModel {
227                        id: ActiveValue::unchanged(usage.id),
228                        input_tokens: ActiveValue::set(
229                            usage.input_tokens + input_token_count as i64,
230                        ),
231                        output_tokens: ActiveValue::set(
232                            usage.output_tokens + output_token_count as i64,
233                        ),
234                        ..Default::default()
235                    })
236                    .exec(&*tx)
237                    .await?
238                }
239                None => {
240                    lifetime_usage::ActiveModel {
241                        user_id: ActiveValue::set(user_id),
242                        model_id: ActiveValue::set(model.id),
243                        input_tokens: ActiveValue::set(input_token_count as i64),
244                        output_tokens: ActiveValue::set(output_token_count as i64),
245                        ..Default::default()
246                    }
247                    .insert(&*tx)
248                    .await?
249                }
250            };
251
252            let lifetime_spending = calculate_spending(
253                model,
254                lifetime_usage.input_tokens as usize,
255                lifetime_usage.output_tokens as usize,
256            );
257
258            Ok(Usage {
259                requests_this_minute,
260                tokens_this_minute,
261                tokens_this_day,
262                input_tokens_this_month,
263                output_tokens_this_month,
264                spending_this_month,
265                lifetime_spending,
266            })
267        })
268        .await
269    }
270
271    pub async fn get_active_user_count(&self, now: DateTimeUtc) -> Result<ActiveUserCount> {
272        self.transaction(|tx| async move {
273            let minute_since = now - Duration::minutes(5);
274            let day_since = now - Duration::days(5);
275
276            let users_in_recent_minutes = usage::Entity::find()
277                .filter(
278                    usage::Column::Timestamp
279                        .gte(minute_since.naive_utc())
280                        .and(usage::Column::IsStaff.eq(false)),
281                )
282                .select_only()
283                .column(usage::Column::UserId)
284                .group_by(usage::Column::UserId)
285                .count(&*tx)
286                .await? as usize;
287
288            let users_in_recent_days = usage::Entity::find()
289                .filter(
290                    usage::Column::Timestamp
291                        .gte(day_since.naive_utc())
292                        .and(usage::Column::IsStaff.eq(false)),
293                )
294                .select_only()
295                .column(usage::Column::UserId)
296                .group_by(usage::Column::UserId)
297                .count(&*tx)
298                .await? as usize;
299
300            Ok(ActiveUserCount {
301                users_in_recent_minutes,
302                users_in_recent_days,
303            })
304        })
305        .await
306    }
307
308    #[allow(clippy::too_many_arguments)]
309    async fn update_usage_for_measure(
310        &self,
311        user_id: UserId,
312        is_staff: bool,
313        model_id: ModelId,
314        usages: &[usage::Model],
315        usage_measure: UsageMeasure,
316        now: DateTimeUtc,
317        usage_to_add: usize,
318        tx: &DatabaseTransaction,
319    ) -> Result<usize> {
320        let now = now.naive_utc();
321        let measure_id = *self
322            .usage_measure_ids
323            .get(&usage_measure)
324            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
325
326        let mut id = None;
327        let mut timestamp = now;
328        let mut buckets = vec![0_i64];
329
330        if let Some(old_usage) = usages.iter().find(|usage| usage.measure_id == measure_id) {
331            id = Some(old_usage.id);
332            let (live_buckets, buckets_since) =
333                Self::get_live_buckets(old_usage, now, usage_measure);
334            if !live_buckets.is_empty() {
335                buckets.clear();
336                buckets.extend_from_slice(live_buckets);
337                buckets.extend(iter::repeat(0).take(buckets_since));
338                timestamp =
339                    old_usage.timestamp + (usage_measure.bucket_duration() * buckets_since as i32);
340            }
341        }
342
343        *buckets.last_mut().unwrap() += usage_to_add as i64;
344        let total_usage = buckets.iter().sum::<i64>() as usize;
345
346        let mut model = usage::ActiveModel {
347            user_id: ActiveValue::set(user_id),
348            is_staff: ActiveValue::set(is_staff),
349            model_id: ActiveValue::set(model_id),
350            measure_id: ActiveValue::set(measure_id),
351            timestamp: ActiveValue::set(timestamp),
352            buckets: ActiveValue::set(buckets),
353            ..Default::default()
354        };
355
356        if let Some(id) = id {
357            model.id = ActiveValue::unchanged(id);
358            model.update(tx).await?;
359        } else {
360            usage::Entity::insert(model)
361                .exec_without_returning(tx)
362                .await?;
363        }
364
365        Ok(total_usage)
366    }
367
368    fn get_usage_for_measure(
369        &self,
370        usages: &[usage::Model],
371        now: DateTimeUtc,
372        usage_measure: UsageMeasure,
373    ) -> Result<usize> {
374        let now = now.naive_utc();
375        let measure_id = *self
376            .usage_measure_ids
377            .get(&usage_measure)
378            .ok_or_else(|| anyhow!("usage measure {usage_measure} not found"))?;
379        let Some(usage) = usages.iter().find(|usage| usage.measure_id == measure_id) else {
380            return Ok(0);
381        };
382
383        let (live_buckets, _) = Self::get_live_buckets(usage, now, usage_measure);
384        Ok(live_buckets.iter().sum::<i64>() as _)
385    }
386
387    fn get_live_buckets(
388        usage: &usage::Model,
389        now: chrono::NaiveDateTime,
390        measure: UsageMeasure,
391    ) -> (&[i64], usize) {
392        let seconds_since_usage = (now - usage.timestamp).num_seconds().max(0);
393        let buckets_since_usage =
394            seconds_since_usage as f32 / measure.bucket_duration().num_seconds() as f32;
395        let buckets_since_usage = buckets_since_usage.ceil() as usize;
396        let mut live_buckets = &[] as &[i64];
397        if buckets_since_usage < measure.bucket_count() {
398            let expired_bucket_count =
399                (usage.buckets.len() + buckets_since_usage).saturating_sub(measure.bucket_count());
400            live_buckets = &usage.buckets[expired_bucket_count..];
401            while live_buckets.first() == Some(&0) {
402                live_buckets = &live_buckets[1..];
403            }
404        }
405        (live_buckets, buckets_since_usage)
406    }
407}
408
409fn calculate_spending(
410    model: &model::Model,
411    input_tokens_this_month: usize,
412    output_tokens_this_month: usize,
413) -> usize {
414    let input_token_cost =
415        input_tokens_this_month * model.price_per_million_input_tokens as usize / 1_000_000;
416    let output_token_cost =
417        output_tokens_this_month * model.price_per_million_output_tokens as usize / 1_000_000;
418    input_token_cost + output_token_cost
419}
420
421const MINUTE_BUCKET_COUNT: usize = 12;
422const DAY_BUCKET_COUNT: usize = 48;
423const MONTH_BUCKET_COUNT: usize = 30;
424
425impl UsageMeasure {
426    fn bucket_count(&self) -> usize {
427        match self {
428            UsageMeasure::RequestsPerMinute => MINUTE_BUCKET_COUNT,
429            UsageMeasure::TokensPerMinute => MINUTE_BUCKET_COUNT,
430            UsageMeasure::TokensPerDay => DAY_BUCKET_COUNT,
431            UsageMeasure::InputTokensPerMonth => MONTH_BUCKET_COUNT,
432            UsageMeasure::OutputTokensPerMonth => MONTH_BUCKET_COUNT,
433        }
434    }
435
436    fn total_duration(&self) -> Duration {
437        match self {
438            UsageMeasure::RequestsPerMinute => Duration::minutes(1),
439            UsageMeasure::TokensPerMinute => Duration::minutes(1),
440            UsageMeasure::TokensPerDay => Duration::hours(24),
441            UsageMeasure::InputTokensPerMonth => Duration::days(30),
442            UsageMeasure::OutputTokensPerMonth => Duration::days(30),
443        }
444    }
445
446    fn bucket_duration(&self) -> Duration {
447        self.total_duration() / self.bucket_count() as i32
448    }
449}