ep: Handle errored requests in Anthropic batches (#46351)

Oleksiy Syvokon created

Also, save all requests in a single sqlite transaction -- much faster.

Release Notes:

- N/A

Change summary

crates/anthropic/src/batches.rs                    |  9 ++
crates/edit_prediction_cli/src/anthropic_client.rs | 44 ++++++++++++++-
2 files changed, 49 insertions(+), 4 deletions(-)

Detailed changes

crates/anthropic/src/batches.rs 🔗

@@ -46,13 +46,20 @@ pub enum BatchResult {
     #[serde(rename = "succeeded")]
     Succeeded { message: Response },
     #[serde(rename = "errored")]
-    Errored { error: ApiError },
+    Errored { error: BatchErrorResponse },
     #[serde(rename = "canceled")]
     Canceled,
     #[serde(rename = "expired")]
     Expired,
 }
 
+#[derive(Debug, Serialize, Deserialize)]
+pub struct BatchErrorResponse {
+    #[serde(rename = "type")]
+    pub response_type: String,
+    pub error: ApiError,
+}
+
 #[derive(Debug, Serialize, Deserialize)]
 pub struct BatchIndividualResponse {
     pub custom_id: String,

crates/edit_prediction_cli/src/anthropic_client.rs 🔗

@@ -304,6 +304,7 @@ impl BatchingLlmClient {
                 .await
                 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
 
+                let mut updates: Vec<(String, String)> = Vec::new();
                 let mut success_count = 0;
                 for result in results {
                     let request_hash = result
@@ -315,21 +316,58 @@ impl BatchingLlmClient {
                     match result.result {
                         anthropic::batches::BatchResult::Succeeded { message } => {
                             let response_json = serde_json::to_string(&message)?;
-                            let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?);
-                            self.connection.exec_bound(q)?((response_json, request_hash))?;
+                            updates.push((response_json, request_hash));
                             success_count += 1;
                         }
                         anthropic::batches::BatchResult::Errored { error } => {
-                            log::error!("Batch request {} failed: {:?}", request_hash, error);
+                            log::error!(
+                                "Batch request {} failed: {}: {}",
+                                request_hash,
+                                error.error.error_type,
+                                error.error.message
+                            );
+                            let error_json = serde_json::json!({
+                                "error": {
+                                    "type": error.error.error_type,
+                                    "message": error.error.message
+                                }
+                            })
+                            .to_string();
+                            updates.push((error_json, request_hash));
                         }
                         anthropic::batches::BatchResult::Canceled => {
                             log::warn!("Batch request {} was canceled", request_hash);
+                            let error_json = serde_json::json!({
+                                "error": {
+                                    "type": "canceled",
+                                    "message": "Batch request was canceled"
+                                }
+                            })
+                            .to_string();
+                            updates.push((error_json, request_hash));
                         }
                         anthropic::batches::BatchResult::Expired => {
                             log::warn!("Batch request {} expired", request_hash);
+                            let error_json = serde_json::json!({
+                                "error": {
+                                    "type": "expired",
+                                    "message": "Batch request expired"
+                                }
+                            })
+                            .to_string();
+                            updates.push((error_json, request_hash));
                         }
                     }
                 }
+
+                self.connection.with_savepoint("batch_download", || {
+                    let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?);
+                    let mut exec = self.connection.exec_bound::<(&str, &str)>(q)?;
+                    for (response_json, request_hash) in &updates {
+                        exec((response_json.as_str(), request_hash.as_str()))?;
+                    }
+                    Ok(())
+                })?;
                 log::info!("Downloaded {} successful requests", success_count);
             }
         }