1use anthropic::{
2 ANTHROPIC_API_URL, Event, Message, Request as AnthropicRequest, RequestContent,
3 Response as AnthropicResponse, ResponseContent, Role, non_streaming_completion,
4 stream_completion,
5};
6use anyhow::Result;
7use futures::StreamExt as _;
8use http_client::HttpClient;
9use indoc::indoc;
10use reqwest_client::ReqwestClient;
11use sqlez::bindable::Bind;
12use sqlez::bindable::StaticColumnCount;
13use sqlez_macros::sql;
14use std::collections::HashSet;
15use std::hash::Hash;
16use std::hash::Hasher;
17use std::path::Path;
18use std::sync::{Arc, Mutex};
19
20pub struct PlainLlmClient {
21 pub http_client: Arc<dyn HttpClient>,
22 pub api_key: String,
23}
24
25impl PlainLlmClient {
26 pub fn new() -> Result<Self> {
27 let http_client: Arc<dyn http_client::HttpClient> = Arc::new(ReqwestClient::new());
28 let api_key = std::env::var("ANTHROPIC_API_KEY")
29 .map_err(|_| anyhow::anyhow!("ANTHROPIC_API_KEY environment variable not set"))?;
30 Ok(Self {
31 http_client,
32 api_key,
33 })
34 }
35
36 pub async fn generate(
37 &self,
38 model: &str,
39 max_tokens: u64,
40 messages: Vec<Message>,
41 ) -> Result<AnthropicResponse> {
42 let request = AnthropicRequest {
43 model: model.to_string(),
44 max_tokens,
45 messages,
46 tools: Vec::new(),
47 thinking: None,
48 tool_choice: None,
49 system: None,
50 metadata: None,
51 output_config: None,
52 stop_sequences: Vec::new(),
53 speed: None,
54 temperature: None,
55 top_k: None,
56 top_p: None,
57 };
58
59 let response = non_streaming_completion(
60 self.http_client.as_ref(),
61 ANTHROPIC_API_URL,
62 &self.api_key,
63 request,
64 None,
65 )
66 .await
67 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
68
69 Ok(response)
70 }
71
72 pub async fn generate_streaming<F>(
73 &self,
74 model: &str,
75 max_tokens: u64,
76 messages: Vec<Message>,
77 mut on_progress: F,
78 ) -> Result<AnthropicResponse>
79 where
80 F: FnMut(usize, &str),
81 {
82 let request = AnthropicRequest {
83 model: model.to_string(),
84 max_tokens,
85 messages,
86 tools: Vec::new(),
87 thinking: None,
88 tool_choice: None,
89 system: None,
90 metadata: None,
91 output_config: None,
92 stop_sequences: Vec::new(),
93 speed: None,
94 temperature: None,
95 top_k: None,
96 top_p: None,
97 };
98
99 let mut stream = stream_completion(
100 self.http_client.as_ref(),
101 ANTHROPIC_API_URL,
102 &self.api_key,
103 request,
104 None,
105 )
106 .await
107 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
108
109 let mut response: Option<AnthropicResponse> = None;
110 let mut text_content = String::new();
111
112 while let Some(event_result) = stream.next().await {
113 let event = event_result.map_err(|e| anyhow::anyhow!("{:?}", e))?;
114
115 match event {
116 Event::MessageStart { message } => {
117 response = Some(message);
118 }
119 Event::ContentBlockDelta { delta, .. } => {
120 if let anthropic::ContentDelta::TextDelta { text } = delta {
121 text_content.push_str(&text);
122 on_progress(text_content.len(), &text_content);
123 }
124 }
125 _ => {}
126 }
127 }
128
129 let mut response = response.ok_or_else(|| anyhow::anyhow!("No response received"))?;
130
131 if response.content.is_empty() && !text_content.is_empty() {
132 response
133 .content
134 .push(ResponseContent::Text { text: text_content });
135 }
136
137 Ok(response)
138 }
139}
140
141pub struct BatchingLlmClient {
142 connection: Mutex<sqlez::connection::Connection>,
143 http_client: Arc<dyn HttpClient>,
144 api_key: String,
145}
146
147struct CacheRow {
148 request_hash: String,
149 request: Option<String>,
150 response: Option<String>,
151 batch_id: Option<String>,
152}
153
154impl StaticColumnCount for CacheRow {
155 fn column_count() -> usize {
156 4
157 }
158}
159
160impl Bind for CacheRow {
161 fn bind(&self, statement: &sqlez::statement::Statement, start_index: i32) -> Result<i32> {
162 let next_index = statement.bind(&self.request_hash, start_index)?;
163 let next_index = statement.bind(&self.request, next_index)?;
164 let next_index = statement.bind(&self.response, next_index)?;
165 let next_index = statement.bind(&self.batch_id, next_index)?;
166 Ok(next_index)
167 }
168}
169
170#[derive(serde::Serialize, serde::Deserialize)]
171struct SerializableRequest {
172 model: String,
173 max_tokens: u64,
174 messages: Vec<SerializableMessage>,
175}
176
177#[derive(serde::Serialize, serde::Deserialize)]
178struct SerializableMessage {
179 role: String,
180 content: String,
181}
182
183impl BatchingLlmClient {
184 fn new(cache_path: &Path) -> Result<Self> {
185 let http_client: Arc<dyn http_client::HttpClient> = Arc::new(ReqwestClient::new());
186 let api_key = std::env::var("ANTHROPIC_API_KEY")
187 .map_err(|_| anyhow::anyhow!("ANTHROPIC_API_KEY environment variable not set"))?;
188
189 let connection = sqlez::connection::Connection::open_file(&cache_path.to_str().unwrap());
190 let mut statement = sqlez::statement::Statement::prepare(
191 &connection,
192 indoc! {"
193 CREATE TABLE IF NOT EXISTS cache (
194 request_hash TEXT PRIMARY KEY,
195 request TEXT,
196 response TEXT,
197 batch_id TEXT
198 );
199 "},
200 )?;
201 statement.exec()?;
202 drop(statement);
203
204 Ok(Self {
205 connection: Mutex::new(connection),
206 http_client,
207 api_key,
208 })
209 }
210
211 pub fn lookup(
212 &self,
213 model: &str,
214 max_tokens: u64,
215 messages: &[Message],
216 seed: Option<usize>,
217 ) -> Result<Option<AnthropicResponse>> {
218 let request_hash_str = Self::request_hash(model, max_tokens, messages, seed);
219 let connection = self.connection.lock().unwrap();
220 let response: Vec<String> = connection.select_bound(
221 &sql!(SELECT response FROM cache WHERE request_hash = ?1 AND response IS NOT NULL;),
222 )?(request_hash_str.as_str())?;
223 Ok(response
224 .into_iter()
225 .next()
226 .and_then(|text| serde_json::from_str(&text).ok()))
227 }
228
229 pub fn mark_for_batch(
230 &self,
231 model: &str,
232 max_tokens: u64,
233 messages: &[Message],
234 seed: Option<usize>,
235 ) -> Result<()> {
236 let request_hash = Self::request_hash(model, max_tokens, messages, seed);
237
238 let serializable_messages: Vec<SerializableMessage> = messages
239 .iter()
240 .map(|msg| SerializableMessage {
241 role: match msg.role {
242 Role::User => "user".to_string(),
243 Role::Assistant => "assistant".to_string(),
244 },
245 content: message_content_to_string(&msg.content),
246 })
247 .collect();
248
249 let serializable_request = SerializableRequest {
250 model: model.to_string(),
251 max_tokens,
252 messages: serializable_messages,
253 };
254
255 let request = Some(serde_json::to_string(&serializable_request)?);
256 let cache_row = CacheRow {
257 request_hash,
258 request,
259 response: None,
260 batch_id: None,
261 };
262 let connection = self.connection.lock().unwrap();
263 connection.exec_bound::<CacheRow>(sql!(
264 INSERT OR IGNORE INTO cache(request_hash, request, response, batch_id) VALUES (?, ?, ?, ?)))?(
265 cache_row,
266 )
267 }
268
269 async fn generate(
270 &self,
271 model: &str,
272 max_tokens: u64,
273 messages: Vec<Message>,
274 seed: Option<usize>,
275 cache_only: bool,
276 ) -> Result<Option<AnthropicResponse>> {
277 let response = self.lookup(model, max_tokens, &messages, seed)?;
278 if let Some(response) = response {
279 return Ok(Some(response));
280 }
281
282 if !cache_only {
283 self.mark_for_batch(model, max_tokens, &messages, seed)?;
284 }
285
286 Ok(None)
287 }
288
289 /// Uploads pending requests as batches (chunked to 16k each); downloads finished batches if any.
290 async fn sync_batches(&self) -> Result<()> {
291 let _batch_ids = self.upload_pending_requests().await?;
292 self.download_finished_batches().await
293 }
294
295 /// Import batch results from external batch IDs (useful for recovering after database loss)
296 pub async fn import_batches(&self, batch_ids: &[String]) -> Result<()> {
297 for batch_id in batch_ids {
298 log::info!("Importing batch {}", batch_id);
299
300 let batch_status = anthropic::batches::retrieve_batch(
301 self.http_client.as_ref(),
302 ANTHROPIC_API_URL,
303 &self.api_key,
304 batch_id,
305 )
306 .await
307 .map_err(|e| anyhow::anyhow!("Failed to retrieve batch {}: {:?}", batch_id, e))?;
308
309 log::info!(
310 "Batch {} status: {}",
311 batch_id,
312 batch_status.processing_status
313 );
314
315 if batch_status.processing_status != "ended" {
316 log::warn!(
317 "Batch {} is not finished (status: {}), skipping",
318 batch_id,
319 batch_status.processing_status
320 );
321 continue;
322 }
323
324 let results = anthropic::batches::retrieve_batch_results(
325 self.http_client.as_ref(),
326 ANTHROPIC_API_URL,
327 &self.api_key,
328 batch_id,
329 )
330 .await
331 .map_err(|e| {
332 anyhow::anyhow!("Failed to retrieve batch results for {}: {:?}", batch_id, e)
333 })?;
334
335 let mut updates: Vec<(String, String, String)> = Vec::new();
336 let mut success_count = 0;
337 let mut error_count = 0;
338
339 for result in results {
340 let request_hash = result
341 .custom_id
342 .strip_prefix("req_hash_")
343 .unwrap_or(&result.custom_id)
344 .to_string();
345
346 match result.result {
347 anthropic::batches::BatchResult::Succeeded { message } => {
348 let response_json = serde_json::to_string(&message)?;
349 updates.push((request_hash, response_json, batch_id.clone()));
350 success_count += 1;
351 }
352 anthropic::batches::BatchResult::Errored { error } => {
353 log::error!(
354 "Batch request {} failed: {}: {}",
355 request_hash,
356 error.error.error_type,
357 error.error.message
358 );
359 let error_json = serde_json::json!({
360 "error": {
361 "type": error.error.error_type,
362 "message": error.error.message
363 }
364 })
365 .to_string();
366 updates.push((request_hash, error_json, batch_id.clone()));
367 error_count += 1;
368 }
369 anthropic::batches::BatchResult::Canceled => {
370 log::warn!("Batch request {} was canceled", request_hash);
371 error_count += 1;
372 }
373 anthropic::batches::BatchResult::Expired => {
374 log::warn!("Batch request {} expired", request_hash);
375 error_count += 1;
376 }
377 }
378 }
379
380 let connection = self.connection.lock().unwrap();
381 connection.with_savepoint("batch_import", || {
382 // Use INSERT OR REPLACE to handle both new entries and updating existing ones
383 let q = sql!(
384 INSERT OR REPLACE INTO cache(request_hash, request, response, batch_id)
385 VALUES (?, (SELECT request FROM cache WHERE request_hash = ?), ?, ?)
386 );
387 let mut exec = connection.exec_bound::<(&str, &str, &str, &str)>(q)?;
388 for (request_hash, response_json, batch_id) in &updates {
389 exec((
390 request_hash.as_str(),
391 request_hash.as_str(),
392 response_json.as_str(),
393 batch_id.as_str(),
394 ))?;
395 }
396 Ok(())
397 })?;
398
399 log::info!(
400 "Imported batch {}: {} successful, {} errors",
401 batch_id,
402 success_count,
403 error_count
404 );
405 }
406
407 Ok(())
408 }
409
410 async fn download_finished_batches(&self) -> Result<()> {
411 let batch_ids: Vec<String> = {
412 let connection = self.connection.lock().unwrap();
413 let q = sql!(SELECT DISTINCT batch_id FROM cache WHERE batch_id IS NOT NULL AND response IS NULL);
414 connection.select(q)?()?
415 };
416
417 for batch_id in &batch_ids {
418 let batch_status = anthropic::batches::retrieve_batch(
419 self.http_client.as_ref(),
420 ANTHROPIC_API_URL,
421 &self.api_key,
422 &batch_id,
423 )
424 .await
425 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
426
427 log::info!(
428 "Batch {} status: {}",
429 batch_id,
430 batch_status.processing_status
431 );
432
433 if batch_status.processing_status == "ended" {
434 let results = anthropic::batches::retrieve_batch_results(
435 self.http_client.as_ref(),
436 ANTHROPIC_API_URL,
437 &self.api_key,
438 &batch_id,
439 )
440 .await
441 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
442
443 let mut updates: Vec<(String, String)> = Vec::new();
444 let mut success_count = 0;
445 for result in results {
446 let request_hash = result
447 .custom_id
448 .strip_prefix("req_hash_")
449 .unwrap_or(&result.custom_id)
450 .to_string();
451
452 match result.result {
453 anthropic::batches::BatchResult::Succeeded { message } => {
454 let response_json = serde_json::to_string(&message)?;
455 updates.push((response_json, request_hash));
456 success_count += 1;
457 }
458 anthropic::batches::BatchResult::Errored { error } => {
459 log::error!(
460 "Batch request {} failed: {}: {}",
461 request_hash,
462 error.error.error_type,
463 error.error.message
464 );
465 let error_json = serde_json::json!({
466 "error": {
467 "type": error.error.error_type,
468 "message": error.error.message
469 }
470 })
471 .to_string();
472 updates.push((error_json, request_hash));
473 }
474 anthropic::batches::BatchResult::Canceled => {
475 log::warn!("Batch request {} was canceled", request_hash);
476 let error_json = serde_json::json!({
477 "error": {
478 "type": "canceled",
479 "message": "Batch request was canceled"
480 }
481 })
482 .to_string();
483 updates.push((error_json, request_hash));
484 }
485 anthropic::batches::BatchResult::Expired => {
486 log::warn!("Batch request {} expired", request_hash);
487 let error_json = serde_json::json!({
488 "error": {
489 "type": "expired",
490 "message": "Batch request expired"
491 }
492 })
493 .to_string();
494 updates.push((error_json, request_hash));
495 }
496 }
497 }
498
499 let connection = self.connection.lock().unwrap();
500 connection.with_savepoint("batch_download", || {
501 let q = sql!(UPDATE cache SET response = ? WHERE request_hash = ?);
502 let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
503 for (response_json, request_hash) in &updates {
504 exec((response_json.as_str(), request_hash.as_str()))?;
505 }
506 Ok(())
507 })?;
508 log::info!("Downloaded {} successful requests", success_count);
509 }
510 }
511
512 Ok(())
513 }
514
515 async fn upload_pending_requests(&self) -> Result<Vec<String>> {
516 const BATCH_CHUNK_SIZE: i32 = 16_000;
517 const MAX_BATCH_SIZE_BYTES: usize = 100 * 1024 * 1024;
518
519 let mut all_batch_ids = Vec::new();
520 let mut total_uploaded = 0;
521
522 let mut current_batch_rows = Vec::new();
523 let mut current_batch_size = 0usize;
524 let mut pending_hashes: HashSet<String> = HashSet::new();
525 loop {
526 let rows: Vec<(String, String)> = {
527 let connection = self.connection.lock().unwrap();
528 let q = sql!(
529 SELECT request_hash, request FROM cache
530 WHERE batch_id IS NULL AND response IS NULL
531 LIMIT ?
532 );
533 connection.select_bound(q)?(BATCH_CHUNK_SIZE)?
534 };
535
536 if rows.is_empty() {
537 break;
538 }
539
540 // Split rows into sub-batches based on size
541 let mut batches_to_upload = Vec::new();
542 let mut new_rows_added = 0;
543
544 for row in rows {
545 let (hash, request_str) = row;
546
547 // Skip rows already added to current_batch_rows but not yet uploaded
548 if pending_hashes.contains(&hash) {
549 continue;
550 }
551 let serializable_request: SerializableRequest = serde_json::from_str(&request_str)?;
552
553 let messages: Vec<Message> = serializable_request
554 .messages
555 .into_iter()
556 .map(|msg| Message {
557 role: match msg.role.as_str() {
558 "user" => Role::User,
559 "assistant" => Role::Assistant,
560 _ => Role::User,
561 },
562 content: vec![RequestContent::Text {
563 text: msg.content,
564 cache_control: None,
565 }],
566 })
567 .collect();
568
569 let params = AnthropicRequest {
570 model: serializable_request.model,
571 max_tokens: serializable_request.max_tokens,
572 messages,
573 tools: Vec::new(),
574 thinking: None,
575 tool_choice: None,
576 system: None,
577 metadata: None,
578 output_config: None,
579 stop_sequences: Vec::new(),
580 temperature: None,
581 top_k: None,
582 top_p: None,
583 speed: None,
584 };
585
586 let custom_id = format!("req_hash_{}", hash);
587 let batch_request = anthropic::batches::BatchRequest { custom_id, params };
588
589 // Estimate the serialized size of this request
590 let estimated_size = serde_json::to_string(&batch_request)?.len();
591
592 // If adding this request would exceed the limit, start a new batch
593 if !current_batch_rows.is_empty()
594 && current_batch_size + estimated_size > MAX_BATCH_SIZE_BYTES
595 {
596 batches_to_upload.push((current_batch_rows, current_batch_size));
597 current_batch_rows = Vec::new();
598 current_batch_size = 0;
599 }
600
601 pending_hashes.insert(hash.clone());
602 current_batch_rows.push((hash, batch_request));
603 current_batch_size += estimated_size;
604 new_rows_added += 1;
605 }
606
607 // If no new rows were added this iteration, all pending requests are already
608 // in current_batch_rows, so we should break to avoid an infinite loop
609 if new_rows_added == 0 {
610 break;
611 }
612
613 // Only upload full batches, keep the partial batch for the next iteration
614 // Upload each sub-batch
615 for (batch_rows, batch_size) in batches_to_upload {
616 let request_hashes: Vec<String> =
617 batch_rows.iter().map(|(hash, _)| hash.clone()).collect();
618
619 // Remove uploaded hashes from pending set
620 for hash in &request_hashes {
621 pending_hashes.remove(hash);
622 }
623 let batch_requests: Vec<anthropic::batches::BatchRequest> =
624 batch_rows.into_iter().map(|(_, req)| req).collect();
625
626 let batch_len = batch_requests.len();
627 log::info!(
628 "Uploading batch with {} requests (~{:.2} MB)",
629 batch_len,
630 batch_size as f64 / (1024.0 * 1024.0)
631 );
632
633 let batch = anthropic::batches::create_batch(
634 self.http_client.as_ref(),
635 ANTHROPIC_API_URL,
636 &self.api_key,
637 anthropic::batches::CreateBatchRequest {
638 requests: batch_requests,
639 },
640 )
641 .await
642 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
643
644 {
645 let connection = self.connection.lock().unwrap();
646 connection.with_savepoint("batch_upload", || {
647 let q = sql!(UPDATE cache SET batch_id = ? WHERE request_hash = ?);
648 let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
649 for hash in &request_hashes {
650 exec((batch.id.as_str(), hash.as_str()))?;
651 }
652 Ok(())
653 })?;
654 }
655
656 total_uploaded += batch_len;
657 log::info!(
658 "Uploaded batch {} with {} requests ({} total)",
659 batch.id,
660 batch_len,
661 total_uploaded
662 );
663
664 all_batch_ids.push(batch.id);
665 }
666 }
667
668 // Upload any remaining partial batch at the end
669 if !current_batch_rows.is_empty() {
670 let request_hashes: Vec<String> = current_batch_rows
671 .iter()
672 .map(|(hash, _)| hash.clone())
673 .collect();
674 let batch_requests: Vec<anthropic::batches::BatchRequest> =
675 current_batch_rows.into_iter().map(|(_, req)| req).collect();
676
677 let batch_len = batch_requests.len();
678 log::info!(
679 "Uploading final batch with {} requests (~{:.2} MB)",
680 batch_len,
681 current_batch_size as f64 / (1024.0 * 1024.0)
682 );
683
684 let batch = anthropic::batches::create_batch(
685 self.http_client.as_ref(),
686 ANTHROPIC_API_URL,
687 &self.api_key,
688 anthropic::batches::CreateBatchRequest {
689 requests: batch_requests,
690 },
691 )
692 .await
693 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
694
695 {
696 let connection = self.connection.lock().unwrap();
697 connection.with_savepoint("batch_upload", || {
698 let q = sql!(UPDATE cache SET batch_id = ? WHERE request_hash = ?);
699 let mut exec = connection.exec_bound::<(&str, &str)>(q)?;
700 for hash in &request_hashes {
701 exec((batch.id.as_str(), hash.as_str()))?;
702 }
703 Ok(())
704 })?;
705 }
706
707 total_uploaded += batch_len;
708 log::info!(
709 "Uploaded batch {} with {} requests ({} total)",
710 batch.id,
711 batch_len,
712 total_uploaded
713 );
714
715 all_batch_ids.push(batch.id);
716 }
717
718 if !all_batch_ids.is_empty() {
719 log::info!(
720 "Finished uploading {} batches with {} total requests",
721 all_batch_ids.len(),
722 total_uploaded
723 );
724 }
725
726 Ok(all_batch_ids)
727 }
728
729 fn request_hash(
730 model: &str,
731 max_tokens: u64,
732 messages: &[Message],
733 seed: Option<usize>,
734 ) -> String {
735 let mut hasher = std::hash::DefaultHasher::new();
736 model.hash(&mut hasher);
737 max_tokens.hash(&mut hasher);
738 for msg in messages {
739 message_content_to_string(&msg.content).hash(&mut hasher);
740 }
741 if let Some(seed) = seed {
742 seed.hash(&mut hasher);
743 }
744 let request_hash = hasher.finish();
745 format!("{request_hash:016x}")
746 }
747}
748
749fn message_content_to_string(content: &[RequestContent]) -> String {
750 content
751 .iter()
752 .filter_map(|c| match c {
753 RequestContent::Text { text, .. } => Some(text.clone()),
754 _ => None,
755 })
756 .collect::<Vec<String>>()
757 .join("\n")
758}
759
760pub enum AnthropicClient {
761 // No batching
762 Plain(PlainLlmClient),
763 Batch(BatchingLlmClient),
764 Dummy,
765}
766
767impl AnthropicClient {
768 pub fn plain() -> Result<Self> {
769 Ok(Self::Plain(PlainLlmClient::new()?))
770 }
771
772 pub fn batch(cache_path: &Path) -> Result<Self> {
773 Ok(Self::Batch(BatchingLlmClient::new(cache_path)?))
774 }
775
776 #[allow(dead_code)]
777 pub fn dummy() -> Self {
778 Self::Dummy
779 }
780
781 pub async fn generate(
782 &self,
783 model: &str,
784 max_tokens: u64,
785 messages: Vec<Message>,
786 seed: Option<usize>,
787 cache_only: bool,
788 ) -> Result<Option<AnthropicResponse>> {
789 match self {
790 AnthropicClient::Plain(plain_llm_client) => plain_llm_client
791 .generate(model, max_tokens, messages)
792 .await
793 .map(Some),
794 AnthropicClient::Batch(batching_llm_client) => {
795 batching_llm_client
796 .generate(model, max_tokens, messages, seed, cache_only)
797 .await
798 }
799 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
800 }
801 }
802
803 #[allow(dead_code)]
804 pub async fn generate_streaming<F>(
805 &self,
806 model: &str,
807 max_tokens: u64,
808 messages: Vec<Message>,
809 on_progress: F,
810 ) -> Result<Option<AnthropicResponse>>
811 where
812 F: FnMut(usize, &str),
813 {
814 match self {
815 AnthropicClient::Plain(plain_llm_client) => plain_llm_client
816 .generate_streaming(model, max_tokens, messages, on_progress)
817 .await
818 .map(Some),
819 AnthropicClient::Batch(_) => {
820 anyhow::bail!("Streaming not supported with batching client")
821 }
822 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
823 }
824 }
825
826 pub async fn sync_batches(&self) -> Result<()> {
827 match self {
828 AnthropicClient::Plain(_) => Ok(()),
829 AnthropicClient::Batch(batching_llm_client) => batching_llm_client.sync_batches().await,
830 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
831 }
832 }
833
834 pub async fn import_batches(&self, batch_ids: &[String]) -> Result<()> {
835 match self {
836 AnthropicClient::Plain(_) => {
837 anyhow::bail!("Import batches is only supported with batching client")
838 }
839 AnthropicClient::Batch(batching_llm_client) => {
840 batching_llm_client.import_batches(batch_ids).await
841 }
842 AnthropicClient::Dummy => panic!("Dummy LLM client is not expected to be used"),
843 }
844 }
845}