collab: Capture upstream input/output rate limits from Anthropic (#28106)

Marshall Bowers created

This PR makes it so we capture the upstream rate limit information from
Anthropic for input and output tokens.

Release Notes:

- N/A

Change summary

crates/anthropic/src/anthropic.rs | 70 ++++++++++++++++++++------------
crates/collab/src/llm.rs          | 12 +++-
2 files changed, 51 insertions(+), 31 deletions(-)

Detailed changes

crates/anthropic/src/anthropic.rs 🔗

@@ -321,38 +321,54 @@ pub async fn stream_completion(
         .map(|output| output.0)
 }
 
+/// An individual rate limit.
+#[derive(Debug)]
+pub struct RateLimit {
+    pub limit: usize,
+    pub remaining: usize,
+    pub reset: DateTime<Utc>,
+}
+
+impl RateLimit {
+    fn from_headers(resource: &str, headers: &HeaderMap<HeaderValue>) -> Result<Self> {
+        let limit =
+            get_header(&format!("anthropic-ratelimit-{resource}-limit"), headers)?.parse()?;
+        let remaining = get_header(
+            &format!("anthropic-ratelimit-{resource}-remaining"),
+            headers,
+        )?
+        .parse()?;
+        let reset = DateTime::parse_from_rfc3339(get_header(
+            &format!("anthropic-ratelimit-{resource}-reset"),
+            headers,
+        )?)?
+        .to_utc();
+
+        Ok(Self {
+            limit,
+            remaining,
+            reset,
+        })
+    }
+}
+
 /// <https://docs.anthropic.com/en/api/rate-limits#response-headers>
 #[derive(Debug)]
 pub struct RateLimitInfo {
-    pub requests_limit: usize,
-    pub requests_remaining: usize,
-    pub requests_reset: DateTime<Utc>,
-    pub tokens_limit: usize,
-    pub tokens_remaining: usize,
-    pub tokens_reset: DateTime<Utc>,
+    pub requests: Option<RateLimit>,
+    pub tokens: Option<RateLimit>,
+    pub input_tokens: Option<RateLimit>,
+    pub output_tokens: Option<RateLimit>,
 }
 
 impl RateLimitInfo {
-    fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
-        let tokens_limit = get_header("anthropic-ratelimit-tokens-limit", headers)?.parse()?;
-        let requests_limit = get_header("anthropic-ratelimit-requests-limit", headers)?.parse()?;
-        let tokens_remaining =
-            get_header("anthropic-ratelimit-tokens-remaining", headers)?.parse()?;
-        let requests_remaining =
-            get_header("anthropic-ratelimit-requests-remaining", headers)?.parse()?;
-        let requests_reset = get_header("anthropic-ratelimit-requests-reset", headers)?;
-        let tokens_reset = get_header("anthropic-ratelimit-tokens-reset", headers)?;
-        let requests_reset = DateTime::parse_from_rfc3339(requests_reset)?.to_utc();
-        let tokens_reset = DateTime::parse_from_rfc3339(tokens_reset)?.to_utc();
-
-        Ok(Self {
-            requests_limit,
-            tokens_limit,
-            requests_remaining,
-            tokens_remaining,
-            requests_reset,
-            tokens_reset,
-        })
+    fn from_headers(headers: &HeaderMap<HeaderValue>) -> Self {
+        Self {
+            requests: RateLimit::from_headers("requests", headers).log_err(),
+            tokens: RateLimit::from_headers("tokens", headers).log_err(),
+            input_tokens: RateLimit::from_headers("input-tokens", headers).log_err(),
+            output_tokens: RateLimit::from_headers("output-tokens", headers).log_err(),
+        }
     }
 }
 
@@ -418,7 +434,7 @@ pub async fn stream_completion_with_rate_limit_info(
                 }
             })
             .boxed();
-        Ok((stream, rate_limits.log_err()))
+        Ok((stream, Some(rate_limits)))
     } else {
         let mut body = Vec::new();
         response

crates/collab/src/llm.rs 🔗

@@ -316,10 +316,14 @@ async fn perform_completion(
                     is_staff = claims.is_staff,
                     provider = params.provider.to_string(),
                     model = model,
-                    tokens_remaining = rate_limit_info.tokens_remaining,
-                    requests_remaining = rate_limit_info.requests_remaining,
-                    requests_reset = ?rate_limit_info.requests_reset,
-                    tokens_reset = ?rate_limit_info.tokens_reset,
+                    tokens_remaining = rate_limit_info.tokens.as_ref().map(|limits| limits.remaining),
+                    input_tokens_remaining = rate_limit_info.input_tokens.as_ref().map(|limits| limits.remaining),
+                    output_tokens_remaining = rate_limit_info.output_tokens.as_ref().map(|limits| limits.remaining),
+                    requests_remaining = rate_limit_info.requests.as_ref().map(|limits| limits.remaining),
+                    requests_reset = ?rate_limit_info.requests.as_ref().map(|limits| limits.reset),
+                    tokens_reset = ?rate_limit_info.tokens.as_ref().map(|limits| limits.reset),
+                    input_tokens_reset = ?rate_limit_info.input_tokens.as_ref().map(|limits| limits.reset),
+                    output_tokens_reset = ?rate_limit_info.output_tokens.as_ref().map(|limits| limits.reset),
                 );
             }