Improve upstream error reporting (#34668)

Richard Feldman created

Now we handle more upstream error cases using the same auto-retry logic.

Release Notes:

- N/A

Change summary

crates/agent/src/thread.rs                     |  29 ++
crates/agent_ui/src/active_thread.rs           |   2 
crates/assistant_tools/src/edit_agent/evals.rs |  25 ++
crates/language_model/src/language_model.rs    |   6 
crates/language_models/src/provider/cloud.rs   | 206 ++++++++++++++++++++
5 files changed, 267 insertions(+), 1 deletion(-)

Detailed changes

crates/agent/src/thread.rs 🔗

@@ -2146,6 +2146,35 @@ impl Thread {
                     max_attempts: MAX_RETRY_ATTEMPTS,
                 })
             }
+            UpstreamProviderError {
+                status,
+                retry_after,
+                ..
+            } => match *status {
+                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
+                    Some(RetryStrategy::Fixed {
+                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
+                        max_attempts: MAX_RETRY_ATTEMPTS,
+                    })
+                }
+                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
+                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
+                    // Internal Server Error could be anything, so only retry once.
+                    max_attempts: 1,
+                }),
+                status => {
+                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
+                    // but we frequently get them in practice. See https://http.dev/529
+                    if status.as_u16() == 529 {
+                        Some(RetryStrategy::Fixed {
+                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
+                            max_attempts: MAX_RETRY_ATTEMPTS,
+                        })
+                    } else {
+                        None
+                    }
+                }
+            },
             ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
                 delay: BASE_RETRY_DELAY,
                 max_attempts: 1,

crates/agent_ui/src/active_thread.rs 🔗

@@ -1036,7 +1036,7 @@ impl ActiveThread {
                             .collect::<Vec<_>>()
                             .join("\n");
                         self.last_error = Some(ThreadError::Message {
-                            header: "Error interacting with language model".into(),
+                            header: "Error".into(),
                             message: error_message.into(),
                         });
                     }

crates/assistant_tools/src/edit_agent/evals.rs 🔗

@@ -12,6 +12,7 @@ use collections::HashMap;
 use fs::FakeFs;
 use futures::{FutureExt, future::LocalBoxFuture};
 use gpui::{AppContext, TestAppContext, Timer};
+use http_client::StatusCode;
 use indoc::{formatdoc, indoc};
 use language_model::{
     LanguageModelRegistry, LanguageModelRequestTool, LanguageModelToolResult,
@@ -1675,6 +1676,30 @@ async fn retry_on_rate_limit<R>(mut request: impl AsyncFnMut() -> Result<R>) ->
                         Timer::after(retry_after + jitter).await;
                         continue;
                     }
+                    LanguageModelCompletionError::UpstreamProviderError {
+                        status,
+                        retry_after,
+                        ..
+                    } => {
+                        // Only retry for specific status codes
+                        let should_retry = matches!(
+                            *status,
+                            StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE
+                        ) || status.as_u16() == 529;
+
+                        if !should_retry {
+                            return Err(err.into());
+                        }
+
+                        // Use server-provided retry_after if available, otherwise use default
+                        let retry_after = retry_after.unwrap_or(Duration::from_secs(5));
+                        let jitter = retry_after.mul_f64(rand::thread_rng().gen_range(0.0..1.0));
+                        eprintln!(
+                            "Attempt #{attempt}: {err}. Retry after {retry_after:?} + jitter of {jitter:?}"
+                        );
+                        Timer::after(retry_after + jitter).await;
+                        continue;
+                    }
                     _ => return Err(err.into()),
                 },
                 Err(err) => return Err(err),

crates/language_model/src/language_model.rs 🔗

@@ -116,6 +116,12 @@ pub enum LanguageModelCompletionError {
         provider: LanguageModelProviderName,
         message: String,
     },
+    #[error("{message}")]
+    UpstreamProviderError {
+        message: String,
+        status: StatusCode,
+        retry_after: Option<Duration>,
+    },
     #[error("HTTP response error from {provider}'s API: status {status_code} - {message:?}")]
     HttpResponseError {
         provider: LanguageModelProviderName,

crates/language_models/src/provider/cloud.rs 🔗

@@ -644,8 +644,62 @@ struct ApiError {
     headers: HeaderMap<HeaderValue>,
 }
 
+/// Represents error responses from Zed's cloud API.
+///
+/// Example JSON for an upstream HTTP error:
+/// ```json
+/// {
+///   "code": "upstream_http_error",
+///   "message": "Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers, reset reason: connection timeout",
+///   "upstream_status": 503
+/// }
+/// ```
+#[derive(Debug, serde::Deserialize)]
+struct CloudApiError {
+    code: String,
+    message: String,
+    #[serde(default)]
+    #[serde(deserialize_with = "deserialize_optional_status_code")]
+    upstream_status: Option<StatusCode>,
+    #[serde(default)]
+    retry_after: Option<f64>,
+}
+
+fn deserialize_optional_status_code<'de, D>(deserializer: D) -> Result<Option<StatusCode>, D::Error>
+where
+    D: serde::Deserializer<'de>,
+{
+    let opt: Option<u16> = Option::deserialize(deserializer)?;
+    Ok(opt.and_then(|code| StatusCode::from_u16(code).ok()))
+}
+
 impl From<ApiError> for LanguageModelCompletionError {
     fn from(error: ApiError) -> Self {
+        if let Ok(cloud_error) = serde_json::from_str::<CloudApiError>(&error.body) {
+            if cloud_error.code.starts_with("upstream_http_") {
+                let status = if let Some(status) = cloud_error.upstream_status {
+                    status
+                } else if cloud_error.code.ends_with("_error") {
+                    error.status
+                } else {
+                    // If there's a status code in the code string (e.g. "upstream_http_429")
+                    // then use that; otherwise, see if the JSON contains a status code.
+                    cloud_error
+                        .code
+                        .strip_prefix("upstream_http_")
+                        .and_then(|code_str| code_str.parse::<u16>().ok())
+                        .and_then(|code| StatusCode::from_u16(code).ok())
+                        .unwrap_or(error.status)
+                };
+
+                return LanguageModelCompletionError::UpstreamProviderError {
+                    message: cloud_error.message,
+                    status,
+                    retry_after: cloud_error.retry_after.map(Duration::from_secs_f64),
+                };
+            }
+        }
+
         let retry_after = None;
         LanguageModelCompletionError::from_http_status(
             PROVIDER_NAME,
@@ -1279,3 +1333,155 @@ impl Component for ZedAiConfiguration {
         )
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use http_client::http::{HeaderMap, StatusCode};
+    use language_model::LanguageModelCompletionError;
+
+    #[test]
+    fn test_api_error_conversion_with_upstream_http_error() {
+        // upstream_http_error with 503 status should become ServerOverloaded
+        let error_body = r#"{"code":"upstream_http_error","message":"Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers, reset reason: connection timeout","upstream_status":503}"#;
+
+        let api_error = ApiError {
+            status: StatusCode::INTERNAL_SERVER_ERROR,
+            body: error_body.to_string(),
+            headers: HeaderMap::new(),
+        };
+
+        let completion_error: LanguageModelCompletionError = api_error.into();
+
+        match completion_error {
+            LanguageModelCompletionError::UpstreamProviderError { message, .. } => {
+                assert_eq!(
+                    message,
+                    "Received an error from the Anthropic API: upstream connect error or disconnect/reset before headers, reset reason: connection timeout"
+                );
+            }
+            _ => panic!(
+                "Expected UpstreamProviderError for upstream 503, got: {:?}",
+                completion_error
+            ),
+        }
+
+        // upstream_http_error with 500 status should become ApiInternalServerError
+        let error_body = r#"{"code":"upstream_http_error","message":"Received an error from the OpenAI API: internal server error","upstream_status":500}"#;
+
+        let api_error = ApiError {
+            status: StatusCode::INTERNAL_SERVER_ERROR,
+            body: error_body.to_string(),
+            headers: HeaderMap::new(),
+        };
+
+        let completion_error: LanguageModelCompletionError = api_error.into();
+
+        match completion_error {
+            LanguageModelCompletionError::UpstreamProviderError { message, .. } => {
+                assert_eq!(
+                    message,
+                    "Received an error from the OpenAI API: internal server error"
+                );
+            }
+            _ => panic!(
+                "Expected UpstreamProviderError for upstream 500, got: {:?}",
+                completion_error
+            ),
+        }
+
+        // upstream_http_error with 429 status should become RateLimitExceeded
+        let error_body = r#"{"code":"upstream_http_error","message":"Received an error from the Google API: rate limit exceeded","upstream_status":429}"#;
+
+        let api_error = ApiError {
+            status: StatusCode::INTERNAL_SERVER_ERROR,
+            body: error_body.to_string(),
+            headers: HeaderMap::new(),
+        };
+
+        let completion_error: LanguageModelCompletionError = api_error.into();
+
+        match completion_error {
+            LanguageModelCompletionError::UpstreamProviderError { message, .. } => {
+                assert_eq!(
+                    message,
+                    "Received an error from the Google API: rate limit exceeded"
+                );
+            }
+            _ => panic!(
+                "Expected UpstreamProviderError for upstream 429, got: {:?}",
+                completion_error
+            ),
+        }
+
+        // Regular 500 error without upstream_http_error should remain ApiInternalServerError for Zed
+        let error_body = "Regular internal server error";
+
+        let api_error = ApiError {
+            status: StatusCode::INTERNAL_SERVER_ERROR,
+            body: error_body.to_string(),
+            headers: HeaderMap::new(),
+        };
+
+        let completion_error: LanguageModelCompletionError = api_error.into();
+
+        match completion_error {
+            LanguageModelCompletionError::ApiInternalServerError { provider, message } => {
+                assert_eq!(provider, PROVIDER_NAME);
+                assert_eq!(message, "Regular internal server error");
+            }
+            _ => panic!(
+                "Expected ApiInternalServerError for regular 500, got: {:?}",
+                completion_error
+            ),
+        }
+
+        // upstream_http_429 format should be converted to UpstreamProviderError
+        let error_body = r#"{"code":"upstream_http_429","message":"Upstream Anthropic rate limit exceeded.","retry_after":30.5}"#;
+
+        let api_error = ApiError {
+            status: StatusCode::INTERNAL_SERVER_ERROR,
+            body: error_body.to_string(),
+            headers: HeaderMap::new(),
+        };
+
+        let completion_error: LanguageModelCompletionError = api_error.into();
+
+        match completion_error {
+            LanguageModelCompletionError::UpstreamProviderError {
+                message,
+                status,
+                retry_after,
+            } => {
+                assert_eq!(message, "Upstream Anthropic rate limit exceeded.");
+                assert_eq!(status, StatusCode::TOO_MANY_REQUESTS);
+                assert_eq!(retry_after, Some(Duration::from_secs_f64(30.5)));
+            }
+            _ => panic!(
+                "Expected UpstreamProviderError for upstream_http_429, got: {:?}",
+                completion_error
+            ),
+        }
+
+        // Invalid JSON in error body should fall back to regular error handling
+        let error_body = "Not JSON at all";
+
+        let api_error = ApiError {
+            status: StatusCode::INTERNAL_SERVER_ERROR,
+            body: error_body.to_string(),
+            headers: HeaderMap::new(),
+        };
+
+        let completion_error: LanguageModelCompletionError = api_error.into();
+
+        match completion_error {
+            LanguageModelCompletionError::ApiInternalServerError { provider, .. } => {
+                assert_eq!(provider, PROVIDER_NAME);
+            }
+            _ => panic!(
+                "Expected ApiInternalServerError for invalid JSON, got: {:?}",
+                completion_error
+            ),
+        }
+    }
+}