Handle `upstream_http_error` completion responses (#34573)

Richard Feldman created

Addresses upstream errors such as:
<img width="831" height="100" alt="Screenshot 2025-07-16 at 3 37 03 PM"
src="https://github.com/user-attachments/assets/2aeb0257-6761-4148-b687-25fae93c68d8"
/>

These should now automatically retry like other upstream HTTP error
codes.

Release Notes:

- N/A

Change summary

crates/language_model/src/language_model.rs | 128 +++++++++++++++++++++++
1 file changed, 128 insertions(+)

Detailed changes

crates/language_model/src/language_model.rs 🔗

@@ -178,6 +178,21 @@ pub enum LanguageModelCompletionError {
 }
 
 impl LanguageModelCompletionError {
+    fn parse_upstream_error_json(message: &str) -> Option<(StatusCode, String)> {
+        let error_json = serde_json::from_str::<serde_json::Value>(message).ok()?;
+        let upstream_status = error_json
+            .get("upstream_status")
+            .and_then(|v| v.as_u64())
+            .and_then(|status| u16::try_from(status).ok())
+            .and_then(|status| StatusCode::from_u16(status).ok())?;
+        let inner_message = error_json
+            .get("message")
+            .and_then(|v| v.as_str())
+            .unwrap_or(message)
+            .to_string();
+        Some((upstream_status, inner_message))
+    }
+
     pub fn from_cloud_failure(
         upstream_provider: LanguageModelProviderName,
         code: String,
@@ -191,6 +206,18 @@ impl LanguageModelCompletionError {
             Self::PromptTooLarge {
                 tokens: Some(tokens),
             }
+        } else if code == "upstream_http_error" {
+            if let Some((upstream_status, inner_message)) =
+                Self::parse_upstream_error_json(&message)
+            {
+                return Self::from_http_status(
+                    upstream_provider,
+                    upstream_status,
+                    inner_message,
+                    retry_after,
+                );
+            }
+            anyhow!("completion request failed, code: {code}, message: {message}").into()
         } else if let Some(status_code) = code
             .strip_prefix("upstream_http_")
             .and_then(|code| StatusCode::from_str(code).ok())
@@ -701,3 +728,104 @@ impl From<String> for LanguageModelProviderName {
         Self(SharedString::from(value))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_from_cloud_failure_with_upstream_http_error() {
+        let error = LanguageModelCompletionError::from_cloud_failure(
+            String::from("anthropic").into(),
+            "upstream_http_error".to_string(),
+            r#"{"code":"upstream_http_error","message":"Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers. reset reason: connection timeout","upstream_status":503}"#.to_string(),
+            None,
+        );
+
+        match error {
+            LanguageModelCompletionError::ServerOverloaded { provider, .. } => {
+                assert_eq!(provider.0, "anthropic");
+            }
+            _ => panic!(
+                "Expected ServerOverloaded error for 503 status, got: {:?}",
+                error
+            ),
+        }
+
+        let error = LanguageModelCompletionError::from_cloud_failure(
+            String::from("anthropic").into(),
+            "upstream_http_error".to_string(),
+            r#"{"code":"upstream_http_error","message":"Internal server error","upstream_status":500}"#.to_string(),
+            None,
+        );
+
+        match error {
+            LanguageModelCompletionError::ApiInternalServerError { provider, message } => {
+                assert_eq!(provider.0, "anthropic");
+                assert_eq!(message, "Internal server error");
+            }
+            _ => panic!(
+                "Expected ApiInternalServerError for 500 status, got: {:?}",
+                error
+            ),
+        }
+    }
+
+    #[test]
+    fn test_from_cloud_failure_with_standard_format() {
+        let error = LanguageModelCompletionError::from_cloud_failure(
+            String::from("anthropic").into(),
+            "upstream_http_503".to_string(),
+            "Service unavailable".to_string(),
+            None,
+        );
+
+        match error {
+            LanguageModelCompletionError::ServerOverloaded { provider, .. } => {
+                assert_eq!(provider.0, "anthropic");
+            }
+            _ => panic!("Expected ServerOverloaded error for upstream_http_503"),
+        }
+    }
+
+    #[test]
+    fn test_upstream_http_error_connection_timeout() {
+        let error = LanguageModelCompletionError::from_cloud_failure(
+            String::from("anthropic").into(),
+            "upstream_http_error".to_string(),
+            r#"{"code":"upstream_http_error","message":"Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers. reset reason: connection timeout","upstream_status":503}"#.to_string(),
+            None,
+        );
+
+        match error {
+            LanguageModelCompletionError::ServerOverloaded { provider, .. } => {
+                assert_eq!(provider.0, "anthropic");
+            }
+            _ => panic!(
+                "Expected ServerOverloaded error for connection timeout with 503 status, got: {:?}",
+                error
+            ),
+        }
+
+        let error = LanguageModelCompletionError::from_cloud_failure(
+            String::from("anthropic").into(),
+            "upstream_http_error".to_string(),
+            r#"{"code":"upstream_http_error","message":"Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers. reset reason: connection timeout","upstream_status":500}"#.to_string(),
+            None,
+        );
+
+        match error {
+            LanguageModelCompletionError::ApiInternalServerError { provider, message } => {
+                assert_eq!(provider.0, "anthropic");
+                assert_eq!(
+                    message,
+                    "Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers. reset reason: connection timeout"
+                );
+            }
+            _ => panic!(
+                "Expected ApiInternalServerError for connection timeout with 500 status, got: {:?}",
+                error
+            ),
+        }
+    }
+}