1use anthropic::{
2 ANTHROPIC_API_URL, Event, Message, Request as AnthropicRequest, RequestContent,
3 Response as AnthropicResponse, ResponseContent, Role, non_streaming_completion,
4 stream_completion,
5};
6use anyhow::Result;
7use futures::StreamExt as _;
8use http_client::HttpClient;
9use indoc::indoc;
10use reqwest_client::ReqwestClient;
11use sqlez::bindable::Bind;
12use sqlez::bindable::StaticColumnCount;
13use sqlez_macros::sql;
14use std::collections::HashSet;
15use std::hash::Hash;
16use std::hash::Hasher;
17use std::path::Path;
18use std::sync::{Arc, Mutex};
19
20pub struct PlainLlmClient {
21 pub http_client: Arc<dyn HttpClient>,
22 pub api_key: String,
23}
24
25impl PlainLlmClient {
26 pub fn new() -> Result<Self> {
27 let http_client: Arc<dyn http_client::HttpClient> = Arc::new(ReqwestClient::new());
28 let api_key = std::env::var("ANTHROPIC_API_KEY")
29 .map_err(|_| anyhow::anyhow!("ANTHROPIC_API_KEY environment variable not set"))?;
30 Ok(Self {
31 http_client,
32 api_key,
33 })
34 }
35
36 pub async fn generate(
37 &self,
38 model: &str,
39 max_tokens: u64,
40 messages: Vec<Message>,
41 ) -> Result<AnthropicResponse> {
42 let request = AnthropicRequest {
43 model: model.to_string(),
44 max_tokens,
45 messages,
46 tools: Vec::new(),
47 thinking: None,
48 tool_choice: None,
49 system: None,
50 metadata: None,
51 output_config: None,
52 stop_sequences: Vec::new(),
53 temperature: None,
54 top_k: None,
55 top_p: None,
56 };
57
58 let response = non_streaming_completion(
59 self.http_client.as_ref(),
60 ANTHROPIC_API_URL,
61 &self.api_key,
62 request,
63 None,
64 )
65 .await
66 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
67
68 Ok(response)
69 }
70
71 pub async fn generate_streaming<F>(
72 &self,
73 model: &str,
74 max_tokens: u64,
75 messages: Vec<Message>,
76 mut on_progress: F,
77 ) -> Result<AnthropicResponse>
78 where
79 F: FnMut(usize, &str),
80 {
81 let request = AnthropicRequest {
82 model: model.to_string(),
83 max_tokens,
84 messages,
85 tools: Vec::new(),
86 thinking: None,
87 tool_choice: None,
88 system: None,
89 metadata: None,
90 output_config: None,
91 stop_sequences: Vec::new(),
92 temperature: None,
93 top_k: None,
94 top_p: None,
95 };
96
97 let mut stream = stream_completion(
98 self.http_client.as_ref(),
99 ANTHROPIC_API_URL,
100 &self.api_key,
101 request,
102 None,
103 )
104 .await
105 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
106
107 let mut response: Option<AnthropicResponse> = None;
108 let mut text_content = String::new();
109
110 while let Some(event_result) = stream.next().await {
111 let event = event_result.map_err(|e| anyhow::anyhow!("{:?}", e))?;
112
113 match event {
114 Event::MessageStart { message } => {
115 response = Some(message);
116 }
117 Event::ContentBlockDelta { delta, .. } => {
118 if let anthropic::ContentDelta::TextDelta { text } = delta {
119 text_content.push_str(&text);
120 on_progress(text_content.len(), &text_content);
121 }
122 }
123 _ => {}
124 }
125 }
126
127 let mut response = response.ok_or_else(|| anyhow::anyhow!("No response received"))?;
128
129 if response.content.is_empty() && !text_content.is_empty() {
130 response
131 .content
132 .push(ResponseContent::Text { text: text_content });
133 }
134
135 Ok(response)
136 }
137}
138
139pub struct BatchingLlmClient {
140 connection: Mutex<sqlez::connection::Connection>,
141 http_client: Arc<dyn HttpClient>,
142 api_key: String,
143}
144
145struct CacheRow {
146 request_hash: String,
147 request: Option<String>,
148 response: Option<String>,
149 batch_id: Option<String>,
150}
151
152impl StaticColumnCount for CacheRow {
153 fn column_count() -> usize {
154 4
155 }
156}
157
158impl Bind for CacheRow {
159 fn bind(&self, statement: &sqlez::statement::Statement, start_index: i32) -> Result<i32> {
160 let next_index = statement.bind(&self.request_hash, start_index)?;
161 let next_index = statement.bind(&self.request, next_index)?;
162 let next_index = statement.bind(&self.response, next_index)?;
163 let next_index = statement.bind(&self.batch_id, next_index)?;
164 Ok(next_index)
165 }
166}
167
168#[derive(serde::Serialize, serde::Deserialize)]
169struct SerializableRequest {
170 model: String,
171 max_tokens: u64,
172 messages: Vec<SerializableMessage>,
173}
174
175#[derive(serde::Serialize, serde::Deserialize)]
176struct SerializableMessage {
177 role: String,
178 content: String,
179}
180
181impl BatchingLlmClient {
182 fn new(cache_path: &Path) -> Result<Self> {
183 let http_client: Arc<dyn http_client::HttpClient> = Arc::new(ReqwestClient::new());
184 let api_key = std::env::var("ANTHROPIC_API_KEY")
185 .map_err(|_| anyhow::anyhow!("ANTHROPIC_API_KEY environment variable not set"))?;
186
187 let connection = sqlez::connection::Connection::open_file(&cache_path.to_str().unwrap());
188 let mut statement = sqlez::statement::Statement::prepare(
189 &connection,
190 indoc! {"
191 CREATE TABLE IF NOT EXISTS cache (
192 request_hash TEXT PRIMARY KEY,
193 request TEXT,
194 response TEXT,
195 batch_id TEXT
196 );
197 "},
198 )?;
199 statement.exec()?;
200 drop(statement);
201
202 Ok(Self {
203 connection: Mutex::new(connection),
204 http_client,
205 api_key,
206 })
207 }
208
209 pub fn lookup(
210 &self,
211 model: &str,
212 max_tokens: u64,
213 messages: &[Message],
214 seed: Option<usize>,
215 ) -> Result<Option<AnthropicResponse>> {
216 let request_hash_str = Self::request_hash(model, max_tokens, messages, seed);
217 let connection = self.connection.lock().unwrap();
218 let response: Vec<String> = connection.select_bound(
219 &sql!(SELECT response FROM cache WHERE request_hash = ?1 AND response IS NOT NULL;),
220 )?(request_hash_str.as_str())?;
221 Ok(response
222 .into_iter()
223 .next()
224 .and_then(|text| serde_json::from_str(&text).ok()))
225 }
226
227 pub fn mark_for_batch(
228 &self,
229 model: &str,
230 max_tokens: u64,
231 messages: &[Message],
232 seed: Option<usize>,
233 ) -> Result<()> {
234 let request_hash = Self::request_hash(model, max_tokens, messages, seed);
235
236 let serializable_messages: Vec<SerializableMessage> = messages
237 .iter()
238 .map(|msg| SerializableMessage {
239 role: match msg.role {
240 Role::User => "user".to_string(),
241 Role::Assistant => "assistant".to_string(),
242 },
243 content: message_content_to_string(&msg.content),
244 })
245 .collect();
246
247 let serializable_request = SerializableRequest {
248 model: model.to_string(),
249 max_tokens,
250 messages: serializable_messages,
251 };
252
253 let request = Some(serde_json::to_string(&serializable_request)?);
254 let cache_row = CacheRow {
255 request_hash,
256 request,
257 response: None,
258 batch_id: None,
259 };
260 let connection = self.connection.lock().unwrap();
261 connection.exec_bound::<CacheRow>(sql!(
262 INSERT OR IGNORE INTO cache(request_hash, request, response, batch_id) VALUES (?, ?, ?, ?)))?(
263 cache_row,
264 )
265 }
266
267 async fn generate(
268 &self,
269 model: &str,
270 max_tokens: u64,
271 messages: Vec<Message>,
272 seed: Option<usize>,
273 cache_only: bool,
274 ) -> Result<Option<AnthropicResponse>> {
275 let response = self.lookup(model, max_tokens, &messages, seed)?;
276 if let Some(response) = response {
277 return Ok(Some(response));
278 }
279
280 if !cache_only {
281 self.mark_for_batch(model, max_tokens, &messages, seed)?;
282 }
283
284 Ok(None)
285 }
286
287 /// Uploads pending requests as batches (chunked to 16k each); downloads finished batches if any.
288 async fn sync_batches(&self) -> Result<()> {
289 let _batch_ids = self.upload_pending_requests().await?;
290 self.download_finished_batches().await
291 }
292
293 /// Import batch results from external batch IDs (useful for recovering after database loss)
294 pub async fn import_batches(&self, batch_ids: &[String]) -> Result<()> {
295 for batch_id in batch_ids {
296 log::info!("Importing batch {}", batch_id);
297
298 let batch_status = anthropic::batches::retrieve_batch(
299 self.http_client.as_ref(),
300 ANTHROPIC_API_URL,
301 &self.api_key,
302 batch_id,
303 )
304 .await
305 .map_err(|e| anyhow::anyhow!("Failed to retrieve batch {}: {:?}", batch_id, e))?;
306
307 log::info!(
308 "Batch {} status: {}",
309 batch_id,
310 batch_status.processing_status
311 );
312
313 if batch_status.processing_status != "ended" {
314 log::warn!(
315 "Batch {} is not finished (status: {}), skipping",
316 batch_id,
317 batch_status.processing_status
318 );
319 continue;
320 }
321
322 let results = anthropic::batches::retrieve_batch_results(
323 self.http_client.as_ref(),
324 ANTHROPIC_API_URL,
325 &self.api_key,
326 batch_id,
327 )
328 .await
329 .map_err(|e| {
330 anyhow::anyhow!("Failed to retrieve batch results for {}: {:?}", batch_id, e)
331 })?;
332
333 let mut updates: Vec<(String, String, String)> = Vec::new();
334 let mut success_count = 0;
335 let mut error_count = 0;
336
337 for result in results {
338 let request_hash = result
339 .custom_id
340 .strip_prefix("req_hash_")
341 .unwrap_or(&result.custom_id)
342 .to_string();
343
344 match result.result {
345 anthropic::batches::BatchResult::Succeeded { message } => {
346 let response_json = serde_json::to_string(&message)?;
347 updates.push((request_hash, response_json, batch_id.clone()));
348 success_count += 1;
349 }
350 anthropic::batches::BatchResult::Errored { error } => {
351 log::error!(
352 "Batch request {} failed: {}: {}",
353 request_hash,
354 error.error.error_type,
355 error.error.message
356 );
357 let error_json = serde_json::json!({
358 "error": {
359 "type": error.error.error_type,
360 "message": error.error.message
361 }
362 })
363 .to_string();
364 updates.push((request_hash, error_json, batch_id.clone()));
365 error_count += 1;
366 }
367 anthropic::batches::BatchResult::Canceled => {
368 log::warn!("Batch request {} was canceled", request_hash);
369 error_count += 1;
370 }
371 anthropic::batches::BatchResult::Expired => {
372 log::warn!("Batch request {} expired", request_hash);
373 error_count += 1;
374 }
375 }
376 }
377
378 let connection = self.connection.lock().unwrap();
379 connection.with_savepoint("batch_import", || {
380 // Use INSERT OR REPLACE to handle both new entries and updating existing ones
381 let q = sql!(
382 INSERT OR REPLACE INTO cache(request_hash, request, response, batch_id)
383 VALUES (?, (SELECT request FROM cache WHERE request_hash = ?), ?, ?)
384 );
385 let mut exec = connection.exec_bound::<(&str, &str, &str, &str)>(q)?;
386 for (request_hash, response_json, batch_id) in &updates {
387 exec((
388 request_hash.as_str(),
389 request_hash.as_str(),
390 response_json.as_str(),
391 batch_id.as_str(),
392 ))?;
393 }
394 Ok(())
395 })?;
396
397 log::info!(
398 "Imported batch {}: {} successful, {} errors",
399 batch_id,
400 success_count,
401 error_count
402 );
403 }
404
405 Ok(())
406 }
407
408 async fn download_finished_batches(&self) -> Result<()> {
409 let batch_ids: Vec<String> = {
410 let connection = self.connection.lock().unwrap();
411 let q = sql!(SELECT DISTINCT batch_id FROM cache WHERE batch_id IS NOT NULL AND response IS NULL);
412 connection.select(q)?()?
413 };
414
415 for batch_id in &batch_ids {
416 let batch_status = anthropic::batches::retrieve_batch(
417 self.http_client.as_ref(),
418 ANTHROPIC_API_URL,
419 &self.api_key,
420 &batch_id,
421 )
422 .await
423 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
424
425 log::info!(
426 "Batch {} status: {}",
427 batch_id,
428 batch_status.processing_status
429 );
430
431 if batch_status.processing_status == "ended" {
432 let results = anthropic::batches::retrieve_batch_results(
433 self.http_client.as_ref(),
434 ANTHROPIC_API_URL,
435 &self.api_key,
436 &batch_id,
437 )
438 .await
439 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
440
441 let mut updates: Vec<(String, String)> = Vec::new();
442 let mut success_count = 0;
443 for result in results {
444 let request_hash = result
445 .custom_id
446 .strip_prefix("req_hash_")
447 .unwrap_or(&result.custom_id)
448 .to_string();
449
450 match result.result {
451 anthropic::batches::BatchResult::Succeeded { message } => {
452 let response_json = serde_json::to_string(&message)?;
453 updates.push((response_json, request_hash));
454 success_count += 1;
455 }
456 anthropic::batches::BatchResult::Errored { error } => {
457 log::error!(
458 "Batch request {} failed: {}: {}",
459 request_hash,
460 error.error.error_type,
461 error.error.message
462 );
463 let error_json = serde_json::json!({
464 "error": {
465 "type": error.error.error_type,
466 "message": error.error.message
467 }
468 })
469 .to_string();
470 updates.push((error_json, request_hash));
471 }
472 anthropic::batches::BatchResult::Canceled => {
473 log::warn!("Batch request {} was canceled", request_hash);
474 let error_json = serde_json::json!({
475 "error": {
476 "type": "canceled",
477 "message": "Batch request was canceled"
478 }
479 })
480 .to_string();
481 updates.push((error_json, request_hash));
482 }
483 anthropic::batches::BatchResult::Expired => {
484 log::warn!("Batch request {} expired", request_hash);
485 let error_json = serde_json::json!({
486 "error": {
487 "type": "expired",
488 "message": "Batch request expired"
489 }
490 })
491 .to_string();
492 updates.push((error_json, request_hash));
493 }
494 }
495 }
496
497 let connection = self.connection.lock().unwrap();
498 connection.with_savepoint("batch_download", || {
499 let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?);
500 let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
501 for (response_json, request_hash) in &updates {
502 exec((response_json.as_str(), request_hash.as_str()))?;
503 }
504 Ok(())
505 })?;
506 log::info!("Downloaded {} successful requests", success_count);
507 }
508 }
509
510 Ok(())
511 }
512
513 async fn upload_pending_requests(&self) -> Result<Vec<String>> {
514 const BATCH_CHUNK_SIZE: i32 = 16_000;
515 const MAX_BATCH_SIZE_BYTES: usize = 100 * 1024 * 1024;
516
517 let mut all_batch_ids = Vec::new();
518 let mut total_uploaded = 0;
519
520 let mut current_batch_rows = Vec::new();
521 let mut current_batch_size = 0usize;
522 let mut pending_hashes: HashSet<String> = HashSet::new();
523 loop {
524 let rows: Vec<(String, String)> = {
525 let connection = self.connection.lock().unwrap();
526 let q = sql!(
527 SELECT request_hash, request FROM cache
528 WHERE batch_id IS NULL AND response IS NULL
529 LIMIT ?
530 );
531 connection.select_bound(q)?(BATCH_CHUNK_SIZE)?
532 };
533
534 if rows.is_empty() {
535 break;
536 }
537
538 // Split rows into sub-batches based on size
539 let mut batches_to_upload = Vec::new();
540 let mut new_rows_added = 0;
541
542 for row in rows {
543 let (hash, request_str) = row;
544
545 // Skip rows already added to current_batch_rows but not yet uploaded
546 if pending_hashes.contains(&hash) {
547 continue;
548 }
549 let serializable_request: SerializableRequest = serde_json::from_str(&request_str)?;
550
551 let messages: Vec<Message> = serializable_request
552 .messages
553 .into_iter()
554 .map(|msg| Message {
555 role: match msg.role.as_str() {
556 "user" => Role::User,
557 "assistant" => Role::Assistant,
558 _ => Role::User,
559 },
560 content: vec![RequestContent::Text {
561 text: msg.content,
562 cache_control: None,
563 }],
564 })
565 .collect();
566
567 let params = AnthropicRequest {
568 model: serializable_request.model,
569 max_tokens: serializable_request.max_tokens,
570 messages,
571 tools: Vec::new(),
572 thinking: None,
573 tool_choice: None,
574 system: None,
575 metadata: None,
576 output_config: None,
577 stop_sequences: Vec::new(),
578 temperature: None,
579 top_k: None,
580 top_p: None,
581 };
582
583 let custom_id = format!("req_hash_{}", hash);
584 let batch_request = anthropic::batches::BatchRequest { custom_id, params };
585
586 // Estimate the serialized size of this request
587 let estimated_size = serde_json::to_string(&batch_request)?.len();
588
589 // If adding this request would exceed the limit, start a new batch
590 if !current_batch_rows.is_empty()
591 && current_batch_size + estimated_size > MAX_BATCH_SIZE_BYTES
592 {
593 batches_to_upload.push((current_batch_rows, current_batch_size));
594 current_batch_rows = Vec::new();
595 current_batch_size = 0;
596 }
597
598 pending_hashes.insert(hash.clone());
599 current_batch_rows.push((hash, batch_request));
600 current_batch_size += estimated_size;
601 new_rows_added += 1;
602 }
603
604 // If no new rows were added this iteration, all pending requests are already
605 // in current_batch_rows, so we should break to avoid an infinite loop
606 if new_rows_added == 0 {
607 break;
608 }
609
610 // Only upload full batches, keep the partial batch for the next iteration
611 // Upload each sub-batch
612 for (batch_rows, batch_size) in batches_to_upload {
613 let request_hashes: Vec<String> =
614 batch_rows.iter().map(|(hash, _)| hash.clone()).collect();
615
616 // Remove uploaded hashes from pending set
617 for hash in &request_hashes {
618 pending_hashes.remove(hash);
619 }
620 let batch_requests: Vec<anthropic::batches::BatchRequest> =
621 batch_rows.into_iter().map(|(_, req)| req).collect();
622
623 let batch_len = batch_requests.len();
624 log::info!(
625 "Uploading batch with {} requests (~{:.2} MB)",
626 batch_len,
627 batch_size as f64 / (1024.0 * 1024.0)
628 );
629
630 let batch = anthropic::batches::create_batch(
631 self.http_client.as_ref(),
632 ANTHROPIC_API_URL,
633 &self.api_key,
634 anthropic::batches::CreateBatchRequest {
635 requests: batch_requests,
636 },
637 )
638 .await
639 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
640
641 {
642 let connection = self.connection.lock().unwrap();
643 connection.with_savepoint("batch_upload", || {
644 let q = sql!(UPDATE cache SET batch_id = ? WHERE request_hash = ?);
645 let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
646 for hash in &request_hashes {
647 exec((batch.id.as_str(), hash.as_str()))?;
648 }
649 Ok(())
650 })?;
651 }
652
653 total_uploaded += batch_len;
654 log::info!(
655 "Uploaded batch {} with {} requests ({} total)",
656 batch.id,
657 batch_len,
658 total_uploaded
659 );
660
661 all_batch_ids.push(batch.id);
662 }
663 }
664
665 // Upload any remaining partial batch at the end
666 if !current_batch_rows.is_empty() {
667 let request_hashes: Vec<String> = current_batch_rows
668 .iter()
669 .map(|(hash, _)| hash.clone())
670 .collect();
671 let batch_requests: Vec<anthropic::batches::BatchRequest> =
672 current_batch_rows.into_iter().map(|(_, req)| req).collect();
673
674 let batch_len = batch_requests.len();
675 log::info!(
676 "Uploading final batch with {} requests (~{:.2} MB)",
677 batch_len,
678 current_batch_size as f64 / (1024.0 * 1024.0)
679 );
680
681 let batch = anthropic::batches::create_batch(
682 self.http_client.as_ref(),
683 ANTHROPIC_API_URL,
684 &self.api_key,
685 anthropic::batches::CreateBatchRequest {
686 requests: batch_requests,
687 },
688 )
689 .await
690 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
691
692 {
693 let connection = self.connection.lock().unwrap();
694 connection.with_savepoint("batch_upload", || {
695 let q = sql!(UPDATE cache SET batch_id = ? WHERE request_hash = ?);
696 let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
697 for hash in &request_hashes {
698 exec((batch.id.as_str(), hash.as_str()))?;
699 }
700 Ok(())
701 })?;
702 }
703
704 total_uploaded += batch_len;
705 log::info!(
706 "Uploaded batch {} with {} requests ({} total)",
707 batch.id,
708 batch_len,
709 total_uploaded
710 );
711
712 all_batch_ids.push(batch.id);
713 }
714
715 if !all_batch_ids.is_empty() {
716 log::info!(
717 "Finished uploading {} batches with {} total requests",
718 all_batch_ids.len(),
719 total_uploaded
720 );
721 }
722
723 Ok(all_batch_ids)
724 }
725
726 fn request_hash(
727 model: &str,
728 max_tokens: u64,
729 messages: &[Message],
730 seed: Option<usize>,
731 ) -> String {
732 let mut hasher = std::hash::DefaultHasher::new();
733 model.hash(&mut hasher);
734 max_tokens.hash(&mut hasher);
735 for msg in messages {
736 message_content_to_string(&msg.content).hash(&mut hasher);
737 }
738 if let Some(seed) = seed {
739 seed.hash(&mut hasher);
740 }
741 let request_hash = hasher.finish();
742 format!("{request_hash:016x}")
743 }
744}
745
746fn message_content_to_string(content: &[RequestContent]) -> String {
747 content
748 .iter()
749 .filter_map(|c| match c {
750 RequestContent::Text { text, .. } => Some(text.clone()),
751 _ => None,
752 })
753 .collect::<Vec<String>>()
754 .join("\n")
755}
756
757pub enum AnthropicClient {
758 // No batching
759 Plain(PlainLlmClient),
760 Batch(BatchingLlmClient),
761 Dummy,
762}
763
764impl AnthropicClient {
765 pub fn plain() -> Result<Self> {
766 Ok(Self::Plain(PlainLlmClient::new()?))
767 }
768
769 pub fn batch(cache_path: &Path) -> Result<Self> {
770 Ok(Self::Batch(BatchingLlmClient::new(cache_path)?))
771 }
772
773 #[allow(dead_code)]
774 pub fn dummy() -> Self {
775 Self::Dummy
776 }
777
778 pub async fn generate(
779 &self,
780 model: &str,
781 max_tokens: u64,
782 messages: Vec<Message>,
783 seed: Option<usize>,
784 cache_only: bool,
785 ) -> Result<Option<AnthropicResponse>> {
786 match self {
787 AnthropicClient::Plain(plain_llm_client) => plain_llm_client
788 .generate(model, max_tokens, messages)
789 .await
790 .map(Some),
791 AnthropicClient::Batch(batching_llm_client) => {
792 batching_llm_client
793 .generate(model, max_tokens, messages, seed, cache_only)
794 .await
795 }
796 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
797 }
798 }
799
800 #[allow(dead_code)]
801 pub async fn generate_streaming<F>(
802 &self,
803 model: &str,
804 max_tokens: u64,
805 messages: Vec<Message>,
806 on_progress: F,
807 ) -> Result<Option<AnthropicResponse>>
808 where
809 F: FnMut(usize, &str),
810 {
811 match self {
812 AnthropicClient::Plain(plain_llm_client) => plain_llm_client
813 .generate_streaming(model, max_tokens, messages, on_progress)
814 .await
815 .map(Some),
816 AnthropicClient::Batch(_) => {
817 anyhow::bail!("Streaming not supported with batching client")
818 }
819 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
820 }
821 }
822
823 pub async fn sync_batches(&self) -> Result<()> {
824 match self {
825 AnthropicClient::Plain(_) => Ok(()),
826 AnthropicClient::Batch(batching_llm_client) => batching_llm_client.sync_batches().await,
827 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
828 }
829 }
830
831 pub async fn import_batches(&self, batch_ids: &[String]) -> Result<()> {
832 match self {
833 AnthropicClient::Plain(_) => {
834 anyhow::bail!("Import batches is only supported with batching client")
835 }
836 AnthropicClient::Batch(batching_llm_client) => {
837 batching_llm_client.import_batches(batch_ids).await
838 }
839 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
840 }
841 }
842}