From 4c46872ab73cec9002f9d6e1656d24de016b2adc Mon Sep 17 00:00:00 2001 From: Oleksiy Syvokon Date: Thu, 8 Jan 2026 12:59:03 +0200 Subject: [PATCH] ep: Handle errored requests in Anthropic batches (#46351) Also, save all requests in a single sqlite transaction -- much faster. Release Notes: - N/A --- crates/anthropic/src/batches.rs | 9 +++- .../src/anthropic_client.rs | 44 +++++++++++++++++-- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/crates/anthropic/src/batches.rs b/crates/anthropic/src/batches.rs index 5fb594348d45c84e8c246c2611f7cde3aa77a18d..32b45c6ffe78c70916477317f94430642ab2d3b0 100644 --- a/crates/anthropic/src/batches.rs +++ b/crates/anthropic/src/batches.rs @@ -46,13 +46,20 @@ pub enum BatchResult { #[serde(rename = "succeeded")] Succeeded { message: Response }, #[serde(rename = "errored")] - Errored { error: ApiError }, + Errored { error: BatchErrorResponse }, #[serde(rename = "canceled")] Canceled, #[serde(rename = "expired")] Expired, } +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchErrorResponse { + #[serde(rename = "type")] + pub response_type: String, + pub error: ApiError, +} + #[derive(Debug, Serialize, Deserialize)] pub struct BatchIndividualResponse { pub custom_id: String, diff --git a/crates/edit_prediction_cli/src/anthropic_client.rs b/crates/edit_prediction_cli/src/anthropic_client.rs index e70f834223b5670782d45040b390e49b524bb82f..4ba826617c40e2e5f71530e5b25eeb42e1351bb9 100644 --- a/crates/edit_prediction_cli/src/anthropic_client.rs +++ b/crates/edit_prediction_cli/src/anthropic_client.rs @@ -304,6 +304,7 @@ impl BatchingLlmClient { .await .map_err(|e| anyhow::anyhow!("{:?}", e))?; + let mut updates: Vec<(String, String)> = Vec::new(); let mut success_count = 0; for result in results { let request_hash = result @@ -315,21 +316,58 @@ impl BatchingLlmClient { match result.result { anthropic::batches::BatchResult::Succeeded { message } => { let response_json = serde_json::to_string(&message)?; - let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?); - self.connection.exec_bound(q)?((response_json, request_hash))?; + updates.push((response_json, request_hash)); success_count += 1; } anthropic::batches::BatchResult::Errored { error } => { - log::error!("Batch request {} failed: {:?}", request_hash, error); + log::error!( + "Batch request {} failed: {}: {}", + request_hash, + error.error.error_type, + error.error.message + ); + let error_json = serde_json::json!({ + "error": { + "type": error.error.error_type, + "message": error.error.message + } + }) + .to_string(); + updates.push((error_json, request_hash)); } anthropic::batches::BatchResult::Canceled => { log::warn!("Batch request {} was canceled", request_hash); + let error_json = serde_json::json!({ + "error": { + "type": "canceled", + "message": "Batch request was canceled" + } + }) + .to_string(); + updates.push((error_json, request_hash)); } anthropic::batches::BatchResult::Expired => { log::warn!("Batch request {} expired", request_hash); + let error_json = serde_json::json!({ + "error": { + "type": "expired", + "message": "Batch request expired" + } + }) + .to_string(); + updates.push((error_json, request_hash)); } } } + + self.connection.with_savepoint("batch_download", || { + let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?); + let mut exec = self.connection.exec_bound::<(&str, &str)>(q)?; + for (response_json, request_hash) in &updates { + exec((response_json.as_str(), request_hash.as_str()))?; + } + Ok(()) + })?; log::info!("Downloaded {} successful requests", success_count); } }