anthropic_client.rs

  1use anthropic::{
  2    ANTHROPIC_API_URL, Event, Message, Request as AnthropicRequest, RequestContent,
  3    Response as AnthropicResponse, ResponseContent, Role, non_streaming_completion,
  4    stream_completion,
  5};
  6use anyhow::Result;
  7use futures::StreamExt as _;
  8use http_client::HttpClient;
  9use indoc::indoc;
 10use reqwest_client::ReqwestClient;
 11use sqlez::bindable::Bind;
 12use sqlez::bindable::StaticColumnCount;
 13use sqlez_macros::sql;
 14use std::collections::HashSet;
 15use std::hash::Hash;
 16use std::hash::Hasher;
 17use std::path::Path;
 18use std::sync::{Arc, Mutex};
 19
 20pub struct PlainLlmClient {
 21    pub http_client: Arc<dyn HttpClient>,
 22    pub api_key: String,
 23}
 24
 25impl PlainLlmClient {
 26    pub fn new() -> Result<Self> {
 27        let http_client: Arc<dyn http_client::HttpClient> = Arc::new(ReqwestClient::new());
 28        let api_key = std::env::var("ANTHROPIC_API_KEY")
 29            .map_err(|_| anyhow::anyhow!("ANTHROPIC_API_KEY environment variable not set"))?;
 30        Ok(Self {
 31            http_client,
 32            api_key,
 33        })
 34    }
 35
 36    pub async fn generate(
 37        &self,
 38        model: &str,
 39        max_tokens: u64,
 40        messages: Vec<Message>,
 41    ) -> Result<AnthropicResponse> {
 42        let request = AnthropicRequest {
 43            model: model.to_string(),
 44            max_tokens,
 45            messages,
 46            tools: Vec::new(),
 47            thinking: None,
 48            tool_choice: None,
 49            system: None,
 50            metadata: None,
 51            output_config: None,
 52            stop_sequences: Vec::new(),
 53            speed: None,
 54            temperature: None,
 55            top_k: None,
 56            top_p: None,
 57        };
 58
 59        let response = non_streaming_completion(
 60            self.http_client.as_ref(),
 61            ANTHROPIC_API_URL,
 62            &self.api_key,
 63            request,
 64            None,
 65        )
 66        .await
 67        .map_err(|e| anyhow::anyhow!("{:?}", e))?;
 68
 69        Ok(response)
 70    }
 71
 72    pub async fn generate_streaming<F>(
 73        &self,
 74        model: &str,
 75        max_tokens: u64,
 76        messages: Vec<Message>,
 77        mut on_progress: F,
 78    ) -> Result<AnthropicResponse>
 79    where
 80        F: FnMut(usize, &str),
 81    {
 82        let request = AnthropicRequest {
 83            model: model.to_string(),
 84            max_tokens,
 85            messages,
 86            tools: Vec::new(),
 87            thinking: None,
 88            tool_choice: None,
 89            system: None,
 90            metadata: None,
 91            output_config: None,
 92            stop_sequences: Vec::new(),
 93            speed: None,
 94            temperature: None,
 95            top_k: None,
 96            top_p: None,
 97        };
 98
 99        let mut stream = stream_completion(
100            self.http_client.as_ref(),
101            ANTHROPIC_API_URL,
102            &self.api_key,
103            request,
104            None,
105        )
106        .await
107        .map_err(|e| anyhow::anyhow!("{:?}", e))?;
108
109        let mut response: Option<AnthropicResponse> = None;
110        let mut text_content = String::new();
111
112        while let Some(event_result) = stream.next().await {
113            let event = event_result.map_err(|e| anyhow::anyhow!("{:?}", e))?;
114
115            match event {
116                Event::MessageStart { message } => {
117                    response = Some(message);
118                }
119                Event::ContentBlockDelta { delta, .. } => {
120                    if let anthropic::ContentDelta::TextDelta { text } = delta {
121                        text_content.push_str(&text);
122                        on_progress(text_content.len(), &text_content);
123                    }
124                }
125                _ => {}
126            }
127        }
128
129        let mut response = response.ok_or_else(|| anyhow::anyhow!("No response received"))?;
130
131        if response.content.is_empty() && !text_content.is_empty() {
132            response
133                .content
134                .push(ResponseContent::Text { text: text_content });
135        }
136
137        Ok(response)
138    }
139}
140
141pub struct BatchingLlmClient {
142    connection: Mutex<sqlez::connection::Connection>,
143    http_client: Arc<dyn HttpClient>,
144    api_key: String,
145}
146
147struct CacheRow {
148    request_hash: String,
149    request: Option<String>,
150    response: Option<String>,
151    batch_id: Option<String>,
152}
153
154impl StaticColumnCount for CacheRow {
155    fn column_count() -> usize {
156        4
157    }
158}
159
160impl Bind for CacheRow {
161    fn bind(&self, statement: &sqlez::statement::Statement, start_index: i32) -> Result<i32> {
162        let next_index = statement.bind(&self.request_hash, start_index)?;
163        let next_index = statement.bind(&self.request, next_index)?;
164        let next_index = statement.bind(&self.response, next_index)?;
165        let next_index = statement.bind(&self.batch_id, next_index)?;
166        Ok(next_index)
167    }
168}
169
170#[derive(serde::Serialize, serde::Deserialize)]
171struct SerializableRequest {
172    model: String,
173    max_tokens: u64,
174    messages: Vec<SerializableMessage>,
175}
176
177#[derive(serde::Serialize, serde::Deserialize)]
178struct SerializableMessage {
179    role: String,
180    content: String,
181}
182
183impl BatchingLlmClient {
184    fn new(cache_path: &Path) -> Result<Self> {
185        let http_client: Arc<dyn http_client::HttpClient> = Arc::new(ReqwestClient::new());
186        let api_key = std::env::var("ANTHROPIC_API_KEY")
187            .map_err(|_| anyhow::anyhow!("ANTHROPIC_API_KEY environment variable not set"))?;
188
189        let connection = sqlez::connection::Connection::open_file(&cache_path.to_str().unwrap());
190        let mut statement = sqlez::statement::Statement::prepare(
191            &connection,
192            indoc! {"
193                CREATE TABLE IF NOT EXISTS cache (
194                    request_hash TEXT PRIMARY KEY,
195                    request TEXT,
196                    response TEXT,
197                    batch_id TEXT
198                );
199                "},
200        )?;
201        statement.exec()?;
202        drop(statement);
203
204        Ok(Self {
205            connection: Mutex::new(connection),
206            http_client,
207            api_key,
208        })
209    }
210
211    pub fn lookup(
212        &self,
213        model: &str,
214        max_tokens: u64,
215        messages: &[Message],
216        seed: Option<usize>,
217    ) -> Result<Option<AnthropicResponse>> {
218        let request_hash_str = Self::request_hash(model, max_tokens, messages, seed);
219        let connection = self.connection.lock().unwrap();
220        let response: Vec<String> = connection.select_bound(
221            &sql!(SELECT response FROM cache WHERE request_hash = ?1 AND response IS NOT NULL;),
222        )?(request_hash_str.as_str())?;
223        Ok(response
224            .into_iter()
225            .next()
226            .and_then(|text| serde_json::from_str(&text).ok()))
227    }
228
229    pub fn mark_for_batch(
230        &self,
231        model: &str,
232        max_tokens: u64,
233        messages: &[Message],
234        seed: Option<usize>,
235    ) -> Result<()> {
236        let request_hash = Self::request_hash(model, max_tokens, messages, seed);
237
238        let serializable_messages: Vec<SerializableMessage> = messages
239            .iter()
240            .map(|msg| SerializableMessage {
241                role: match msg.role {
242                    Role::User => "user".to_string(),
243                    Role::Assistant => "assistant".to_string(),
244                },
245                content: message_content_to_string(&msg.content),
246            })
247            .collect();
248
249        let serializable_request = SerializableRequest {
250            model: model.to_string(),
251            max_tokens,
252            messages: serializable_messages,
253        };
254
255        let request = Some(serde_json::to_string(&serializable_request)?);
256        let cache_row = CacheRow {
257            request_hash,
258            request,
259            response: None,
260            batch_id: None,
261        };
262        let connection = self.connection.lock().unwrap();
263        connection.exec_bound::<CacheRow>(sql!(
264            INSERT OR IGNORE INTO cache(request_hash, request, response, batch_id) VALUES (?, ?, ?, ?)))?(
265            cache_row,
266        )
267    }
268
269    async fn generate(
270        &self,
271        model: &str,
272        max_tokens: u64,
273        messages: Vec<Message>,
274        seed: Option<usize>,
275        cache_only: bool,
276    ) -> Result<Option<AnthropicResponse>> {
277        let response = self.lookup(model, max_tokens, &messages, seed)?;
278        if let Some(response) = response {
279            return Ok(Some(response));
280        }
281
282        if !cache_only {
283            self.mark_for_batch(model, max_tokens, &messages, seed)?;
284        }
285
286        Ok(None)
287    }
288
289    /// Uploads pending requests as batches (chunked to 16k each); downloads finished batches if any.
290    async fn sync_batches(&self) -> Result<()> {
291        let _batch_ids = self.upload_pending_requests().await?;
292        self.download_finished_batches().await
293    }
294
295    /// Import batch results from external batch IDs (useful for recovering after database loss)
296    pub async fn import_batches(&self, batch_ids: &[String]) -> Result<()> {
297        for batch_id in batch_ids {
298            log::info!("Importing batch {}", batch_id);
299
300            let batch_status = anthropic::batches::retrieve_batch(
301                self.http_client.as_ref(),
302                ANTHROPIC_API_URL,
303                &self.api_key,
304                batch_id,
305            )
306            .await
307            .map_err(|e| anyhow::anyhow!("Failed to retrieve batch {}: {:?}", batch_id, e))?;
308
309            log::info!(
310                "Batch {} status: {}",
311                batch_id,
312                batch_status.processing_status
313            );
314
315            if batch_status.processing_status != "ended" {
316                log::warn!(
317                    "Batch {} is not finished (status: {}), skipping",
318                    batch_id,
319                    batch_status.processing_status
320                );
321                continue;
322            }
323
324            let results = anthropic::batches::retrieve_batch_results(
325                self.http_client.as_ref(),
326                ANTHROPIC_API_URL,
327                &self.api_key,
328                batch_id,
329            )
330            .await
331            .map_err(|e| {
332                anyhow::anyhow!("Failed to retrieve batch results for {}: {:?}", batch_id, e)
333            })?;
334
335            let mut updates: Vec<(String, String, String)> = Vec::new();
336            let mut success_count = 0;
337            let mut error_count = 0;
338
339            for result in results {
340                let request_hash = result
341                    .custom_id
342                    .strip_prefix("req_hash_")
343                    .unwrap_or(&result.custom_id)
344                    .to_string();
345
346                match result.result {
347                    anthropic::batches::BatchResult::Succeeded { message } => {
348                        let response_json = serde_json::to_string(&message)?;
349                        updates.push((request_hash, response_json, batch_id.clone()));
350                        success_count += 1;
351                    }
352                    anthropic::batches::BatchResult::Errored { error } => {
353                        log::error!(
354                            "Batch request {} failed: {}: {}",
355                            request_hash,
356                            error.error.error_type,
357                            error.error.message
358                        );
359                        let error_json = serde_json::json!({
360                            "error": {
361                                "type": error.error.error_type,
362                                "message": error.error.message
363                            }
364                        })
365                        .to_string();
366                        updates.push((request_hash, error_json, batch_id.clone()));
367                        error_count += 1;
368                    }
369                    anthropic::batches::BatchResult::Canceled => {
370                        log::warn!("Batch request {} was canceled", request_hash);
371                        error_count += 1;
372                    }
373                    anthropic::batches::BatchResult::Expired => {
374                        log::warn!("Batch request {} expired", request_hash);
375                        error_count += 1;
376                    }
377                }
378            }
379
380            let connection = self.connection.lock().unwrap();
381            connection.with_savepoint("batch_import", || {
382                // Use INSERT OR REPLACE to handle both new entries and updating existing ones
383                let q = sql!(
384                    INSERT OR REPLACE INTO cache(request_hash, request, response, batch_id)
385                    VALUES (?, (SELECT request FROM cache WHERE request_hash = ?), ?, ?)
386                );
387                let mut exec = connection.exec_bound::<(&str, &str, &str, &str)>(q)?;
388                for (request_hash, response_json, batch_id) in &updates {
389                    exec((
390                        request_hash.as_str(),
391                        request_hash.as_str(),
392                        response_json.as_str(),
393                        batch_id.as_str(),
394                    ))?;
395                }
396                Ok(())
397            })?;
398
399            log::info!(
400                "Imported batch {}: {} successful, {} errors",
401                batch_id,
402                success_count,
403                error_count
404            );
405        }
406
407        Ok(())
408    }
409
410    async fn download_finished_batches(&self) -> Result<()> {
411        let batch_ids: Vec<String> = {
412            let connection = self.connection.lock().unwrap();
413            let q = sql!(SELECT DISTINCT batch_id FROM cache WHERE batch_id IS NOT NULL AND response IS NULL);
414            connection.select(q)?()?
415        };
416
417        for batch_id in &batch_ids {
418            let batch_status = anthropic::batches::retrieve_batch(
419                self.http_client.as_ref(),
420                ANTHROPIC_API_URL,
421                &self.api_key,
422                &batch_id,
423            )
424            .await
425            .map_err(|e| anyhow::anyhow!("{:?}", e))?;
426
427            log::info!(
428                "Batch {} status: {}",
429                batch_id,
430                batch_status.processing_status
431            );
432
433            if batch_status.processing_status == "ended" {
434                let results = anthropic::batches::retrieve_batch_results(
435                    self.http_client.as_ref(),
436                    ANTHROPIC_API_URL,
437                    &self.api_key,
438                    &batch_id,
439                )
440                .await
441                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
442
443                let mut updates: Vec<(String, String)> = Vec::new();
444                let mut success_count = 0;
445                for result in results {
446                    let request_hash = result
447                        .custom_id
448                        .strip_prefix("req_hash_")
449                        .unwrap_or(&result.custom_id)
450                        .to_string();
451
452                    match result.result {
453                        anthropic::batches::BatchResult::Succeeded { message } => {
454                            let response_json = serde_json::to_string(&message)?;
455                            updates.push((response_json, request_hash));
456                            success_count += 1;
457                        }
458                        anthropic::batches::BatchResult::Errored { error } => {
459                            log::error!(
460                                "Batch request {} failed: {}: {}",
461                                request_hash,
462                                error.error.error_type,
463                                error.error.message
464                            );
465                            let error_json = serde_json::json!({
466                                "error": {
467                                    "type": error.error.error_type,
468                                    "message": error.error.message
469                                }
470                            })
471                            .to_string();
472                            updates.push((error_json, request_hash));
473                        }
474                        anthropic::batches::BatchResult::Canceled => {
475                            log::warn!("Batch request {} was canceled", request_hash);
476                            let error_json = serde_json::json!({
477                                "error": {
478                                    "type": "canceled",
479                                    "message": "Batch request was canceled"
480                                }
481                            })
482                            .to_string();
483                            updates.push((error_json, request_hash));
484                        }
485                        anthropic::batches::BatchResult::Expired => {
486                            log::warn!("Batch request {} expired", request_hash);
487                            let error_json = serde_json::json!({
488                                "error": {
489                                    "type": "expired",
490                                    "message": "Batch request expired"
491                                }
492                            })
493                            .to_string();
494                            updates.push((error_json, request_hash));
495                        }
496                    }
497                }
498
499                let connection = self.connection.lock().unwrap();
500                connection.with_savepoint("batch_download", || {
501                    let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?);
502                    let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
503                    for (response_json, request_hash) in &updates {
504                        exec((response_json.as_str(), request_hash.as_str()))?;
505                    }
506                    Ok(())
507                })?;
508                log::info!("Downloaded {} successful requests", success_count);
509            }
510        }
511
512        Ok(())
513    }
514
515    async fn upload_pending_requests(&self) -> Result<Vec<String>> {
516        const BATCH_CHUNK_SIZE: i32 = 16_000;
517        const MAX_BATCH_SIZE_BYTES: usize = 100 * 1024 * 1024;
518
519        let mut all_batch_ids = Vec::new();
520        let mut total_uploaded = 0;
521
522        let mut current_batch_rows = Vec::new();
523        let mut current_batch_size = 0usize;
524        let mut pending_hashes: HashSet<String> = HashSet::new();
525        loop {
526            let rows: Vec<(String, String)> = {
527                let connection = self.connection.lock().unwrap();
528                let q = sql!(
529                    SELECT request_hash, request FROM cache
530                    WHERE batch_id IS NULL AND response IS NULL
531                    LIMIT ?
532                );
533                connection.select_bound(q)?(BATCH_CHUNK_SIZE)?
534            };
535
536            if rows.is_empty() {
537                break;
538            }
539
540            // Split rows into sub-batches based on size
541            let mut batches_to_upload = Vec::new();
542            let mut new_rows_added = 0;
543
544            for row in rows {
545                let (hash, request_str) = row;
546
547                // Skip rows already added to current_batch_rows but not yet uploaded
548                if pending_hashes.contains(&hash) {
549                    continue;
550                }
551                let serializable_request: SerializableRequest = serde_json::from_str(&request_str)?;
552
553                let messages: Vec<Message> = serializable_request
554                    .messages
555                    .into_iter()
556                    .map(|msg| Message {
557                        role: match msg.role.as_str() {
558                            "user" => Role::User,
559                            "assistant" => Role::Assistant,
560                            _ => Role::User,
561                        },
562                        content: vec![RequestContent::Text {
563                            text: msg.content,
564                            cache_control: None,
565                        }],
566                    })
567                    .collect();
568
569                let params = AnthropicRequest {
570                    model: serializable_request.model,
571                    max_tokens: serializable_request.max_tokens,
572                    messages,
573                    tools: Vec::new(),
574                    thinking: None,
575                    tool_choice: None,
576                    system: None,
577                    metadata: None,
578                    output_config: None,
579                    stop_sequences: Vec::new(),
580                    temperature: None,
581                    top_k: None,
582                    top_p: None,
583                    speed: None,
584                };
585
586                let custom_id = format!("req_hash_{}", hash);
587                let batch_request = anthropic::batches::BatchRequest { custom_id, params };
588
589                // Estimate the serialized size of this request
590                let estimated_size = serde_json::to_string(&batch_request)?.len();
591
592                // If adding this request would exceed the limit, start a new batch
593                if !current_batch_rows.is_empty()
594                    && current_batch_size + estimated_size > MAX_BATCH_SIZE_BYTES
595                {
596                    batches_to_upload.push((current_batch_rows, current_batch_size));
597                    current_batch_rows = Vec::new();
598                    current_batch_size = 0;
599                }
600
601                pending_hashes.insert(hash.clone());
602                current_batch_rows.push((hash, batch_request));
603                current_batch_size += estimated_size;
604                new_rows_added += 1;
605            }
606
607            // If no new rows were added this iteration, all pending requests are already
608            // in current_batch_rows, so we should break to avoid an infinite loop
609            if new_rows_added == 0 {
610                break;
611            }
612
613            // Only upload full batches, keep the partial batch for the next iteration
614            // Upload each sub-batch
615            for (batch_rows, batch_size) in batches_to_upload {
616                let request_hashes: Vec<String> =
617                    batch_rows.iter().map(|(hash, _)| hash.clone()).collect();
618
619                // Remove uploaded hashes from pending set
620                for hash in &request_hashes {
621                    pending_hashes.remove(hash);
622                }
623                let batch_requests: Vec<anthropic::batches::BatchRequest> =
624                    batch_rows.into_iter().map(|(_, req)| req).collect();
625
626                let batch_len = batch_requests.len();
627                log::info!(
628                    "Uploading batch with {} requests (~{:.2} MB)",
629                    batch_len,
630                    batch_size as f64 / (1024.0 * 1024.0)
631                );
632
633                let batch = anthropic::batches::create_batch(
634                    self.http_client.as_ref(),
635                    ANTHROPIC_API_URL,
636                    &self.api_key,
637                    anthropic::batches::CreateBatchRequest {
638                        requests: batch_requests,
639                    },
640                )
641                .await
642                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
643
644                {
645                    let connection = self.connection.lock().unwrap();
646                    connection.with_savepoint("batch_upload", || {
647                        let q = sql!(UPDATE cache SET batch_id = ? WHERE request_hash = ?);
648                        let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
649                        for hash in &request_hashes {
650                            exec((batch.id.as_str(), hash.as_str()))?;
651                        }
652                        Ok(())
653                    })?;
654                }
655
656                total_uploaded += batch_len;
657                log::info!(
658                    "Uploaded batch {} with {} requests ({} total)",
659                    batch.id,
660                    batch_len,
661                    total_uploaded
662                );
663
664                all_batch_ids.push(batch.id);
665            }
666        }
667
668        // Upload any remaining partial batch at the end
669        if !current_batch_rows.is_empty() {
670            let request_hashes: Vec<String> = current_batch_rows
671                .iter()
672                .map(|(hash, _)| hash.clone())
673                .collect();
674            let batch_requests: Vec<anthropic::batches::BatchRequest> =
675                current_batch_rows.into_iter().map(|(_, req)| req).collect();
676
677            let batch_len = batch_requests.len();
678            log::info!(
679                "Uploading final batch with {} requests (~{:.2} MB)",
680                batch_len,
681                current_batch_size as f64 / (1024.0 * 1024.0)
682            );
683
684            let batch = anthropic::batches::create_batch(
685                self.http_client.as_ref(),
686                ANTHROPIC_API_URL,
687                &self.api_key,
688                anthropic::batches::CreateBatchRequest {
689                    requests: batch_requests,
690                },
691            )
692            .await
693            .map_err(|e| anyhow::anyhow!("{:?}", e))?;
694
695            {
696                let connection = self.connection.lock().unwrap();
697                connection.with_savepoint("batch_upload", || {
698                    let q = sql!(UPDATE cache SET batch_id = ? WHERE request_hash = ?);
699                    let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
700                    for hash in &request_hashes {
701                        exec((batch.id.as_str(), hash.as_str()))?;
702                    }
703                    Ok(())
704                })?;
705            }
706
707            total_uploaded += batch_len;
708            log::info!(
709                "Uploaded batch {} with {} requests ({} total)",
710                batch.id,
711                batch_len,
712                total_uploaded
713            );
714
715            all_batch_ids.push(batch.id);
716        }
717
718        if !all_batch_ids.is_empty() {
719            log::info!(
720                "Finished uploading {} batches with {} total requests",
721                all_batch_ids.len(),
722                total_uploaded
723            );
724        }
725
726        Ok(all_batch_ids)
727    }
728
729    fn request_hash(
730        model: &str,
731        max_tokens: u64,
732        messages: &[Message],
733        seed: Option<usize>,
734    ) -> String {
735        let mut hasher = std::hash::DefaultHasher::new();
736        model.hash(&mut hasher);
737        max_tokens.hash(&mut hasher);
738        for msg in messages {
739            message_content_to_string(&msg.content).hash(&mut hasher);
740        }
741        if let Some(seed) = seed {
742            seed.hash(&mut hasher);
743        }
744        let request_hash = hasher.finish();
745        format!("{request_hash:016x}")
746    }
747}
748
749fn message_content_to_string(content: &[RequestContent]) -> String {
750    content
751        .iter()
752        .filter_map(|c| match c {
753            RequestContent::Text { text, .. } => Some(text.clone()),
754            _ => None,
755        })
756        .collect::<Vec<String>>()
757        .join("\n")
758}
759
760pub enum AnthropicClient {
761    // No batching
762    Plain(PlainLlmClient),
763    Batch(BatchingLlmClient),
764    Dummy,
765}
766
767impl AnthropicClient {
768    pub fn plain() -> Result<Self> {
769        Ok(Self::Plain(PlainLlmClient::new()?))
770    }
771
772    pub fn batch(cache_path: &Path) -> Result<Self> {
773        Ok(Self::Batch(BatchingLlmClient::new(cache_path)?))
774    }
775
776    #[allow(dead_code)]
777    pub fn dummy() -> Self {
778        Self::Dummy
779    }
780
781    pub async fn generate(
782        &self,
783        model: &str,
784        max_tokens: u64,
785        messages: Vec<Message>,
786        seed: Option<usize>,
787        cache_only: bool,
788    ) -> Result<Option<AnthropicResponse>> {
789        match self {
790            AnthropicClient::Plain(plain_llm_client) => plain_llm_client
791                .generate(model, max_tokens, messages)
792                .await
793                .map(Some),
794            AnthropicClient::Batch(batching_llm_client) => {
795                batching_llm_client
796                    .generate(model, max_tokens, messages, seed, cache_only)
797                    .await
798            }
799            AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
800        }
801    }
802
803    #[allow(dead_code)]
804    pub async fn generate_streaming<F>(
805        &self,
806        model: &str,
807        max_tokens: u64,
808        messages: Vec<Message>,
809        on_progress: F,
810    ) -> Result<Option<AnthropicResponse>>
811    where
812        F: FnMut(usize, &str),
813    {
814        match self {
815            AnthropicClient::Plain(plain_llm_client) => plain_llm_client
816                .generate_streaming(model, max_tokens, messages, on_progress)
817                .await
818                .map(Some),
819            AnthropicClient::Batch(_) => {
820                anyhow::bail!("Streaming not supported with batching client")
821            }
822            AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
823        }
824    }
825
826    pub async fn sync_batches(&self) -> Result<()> {
827        match self {
828            AnthropicClient::Plain(_) => Ok(()),
829            AnthropicClient::Batch(batching_llm_client) => batching_llm_client.sync_batches().await,
830            AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
831        }
832    }
833
834    pub async fn import_batches(&self, batch_ids: &[String]) -> Result<()> {
835        match self {
836            AnthropicClient::Plain(_) => {
837                anyhow::bail!("Import batches is only supported with batching client")
838            }
839            AnthropicClient::Batch(batching_llm_client) => {
840                batching_llm_client.import_batches(batch_ids).await
841            }
842            AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
843        }
844    }
845}