responses.rs

  1use anyhow::{Result, anyhow};
  2use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
  3use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
  4use serde::{Deserialize, Serialize};
  5use serde_json::Value;
  6
  7use crate::{ReasoningEffort, RequestError, Role, ToolChoice};
  8
  9#[derive(Serialize, Debug)]
 10pub struct Request {
 11    pub model: String,
 12    #[serde(skip_serializing_if = "Option::is_none")]
 13    pub instructions: Option<String>,
 14    #[serde(skip_serializing_if = "Vec::is_empty")]
 15    pub input: Vec<ResponseInputItem>,
 16    #[serde(default)]
 17    pub stream: bool,
 18    #[serde(skip_serializing_if = "Option::is_none")]
 19    pub temperature: Option<f32>,
 20    #[serde(skip_serializing_if = "Option::is_none")]
 21    pub top_p: Option<f32>,
 22    #[serde(skip_serializing_if = "Option::is_none")]
 23    pub max_output_tokens: Option<u64>,
 24    #[serde(skip_serializing_if = "Option::is_none")]
 25    pub parallel_tool_calls: Option<bool>,
 26    #[serde(skip_serializing_if = "Option::is_none")]
 27    pub tool_choice: Option<ToolChoice>,
 28    #[serde(skip_serializing_if = "Vec::is_empty")]
 29    pub tools: Vec<ToolDefinition>,
 30    #[serde(skip_serializing_if = "Option::is_none")]
 31    pub prompt_cache_key: Option<String>,
 32    #[serde(skip_serializing_if = "Option::is_none")]
 33    pub reasoning: Option<ReasoningConfig>,
 34    #[serde(skip_serializing_if = "Option::is_none")]
 35    pub store: Option<bool>,
 36}
 37
 38#[derive(Debug, Serialize, Deserialize)]
 39#[serde(tag = "type", rename_all = "snake_case")]
 40pub enum ResponseInputItem {
 41    Message(ResponseMessageItem),
 42    FunctionCall(ResponseFunctionCallItem),
 43    FunctionCallOutput(ResponseFunctionCallOutputItem),
 44}
 45
 46#[derive(Debug, Serialize, Deserialize)]
 47pub struct ResponseMessageItem {
 48    pub role: Role,
 49    pub content: Vec<ResponseInputContent>,
 50}
 51
 52#[derive(Debug, Serialize, Deserialize)]
 53pub struct ResponseFunctionCallItem {
 54    pub call_id: String,
 55    pub name: String,
 56    pub arguments: String,
 57}
 58
 59#[derive(Debug, Serialize, Deserialize)]
 60pub struct ResponseFunctionCallOutputItem {
 61    pub call_id: String,
 62    pub output: ResponseFunctionCallOutputContent,
 63}
 64
 65#[derive(Debug, Serialize, Deserialize)]
 66#[serde(untagged)]
 67pub enum ResponseFunctionCallOutputContent {
 68    List(Vec<ResponseInputContent>),
 69    Text(String),
 70}
 71
 72#[derive(Debug, Clone, Serialize, Deserialize)]
 73#[serde(tag = "type")]
 74pub enum ResponseInputContent {
 75    #[serde(rename = "input_text")]
 76    Text { text: String },
 77    #[serde(rename = "input_image")]
 78    Image { image_url: String },
 79    #[serde(rename = "output_text")]
 80    OutputText {
 81        text: String,
 82        #[serde(default)]
 83        annotations: Vec<serde_json::Value>,
 84    },
 85    #[serde(rename = "refusal")]
 86    Refusal { refusal: String },
 87}
 88
 89#[derive(Serialize, Debug)]
 90pub struct ReasoningConfig {
 91    pub effort: ReasoningEffort,
 92    #[serde(skip_serializing_if = "Option::is_none")]
 93    pub summary: Option<ReasoningSummaryMode>,
 94}
 95
 96#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
 97#[serde(rename_all = "lowercase")]
 98pub enum ReasoningSummaryMode {
 99    Auto,
100    Concise,
101    Detailed,
102}
103
104#[derive(Serialize, Debug)]
105#[serde(tag = "type", rename_all = "snake_case")]
106pub enum ToolDefinition {
107    Function {
108        name: String,
109        #[serde(skip_serializing_if = "Option::is_none")]
110        description: Option<String>,
111        #[serde(skip_serializing_if = "Option::is_none")]
112        parameters: Option<Value>,
113        #[serde(skip_serializing_if = "Option::is_none")]
114        strict: Option<bool>,
115    },
116}
117
118#[derive(Deserialize, Debug)]
119pub struct Error {
120    pub message: String,
121}
122
123#[derive(Deserialize, Debug)]
124#[serde(tag = "type")]
125pub enum StreamEvent {
126    #[serde(rename = "response.created")]
127    Created { response: ResponseSummary },
128    #[serde(rename = "response.in_progress")]
129    InProgress { response: ResponseSummary },
130    #[serde(rename = "response.output_item.added")]
131    OutputItemAdded {
132        output_index: usize,
133        #[serde(default)]
134        sequence_number: Option<u64>,
135        item: ResponseOutputItem,
136    },
137    #[serde(rename = "response.output_item.done")]
138    OutputItemDone {
139        output_index: usize,
140        #[serde(default)]
141        sequence_number: Option<u64>,
142        item: ResponseOutputItem,
143    },
144    #[serde(rename = "response.content_part.added")]
145    ContentPartAdded {
146        item_id: String,
147        output_index: usize,
148        content_index: usize,
149        part: Value,
150    },
151    #[serde(rename = "response.content_part.done")]
152    ContentPartDone {
153        item_id: String,
154        output_index: usize,
155        content_index: usize,
156        part: Value,
157    },
158    #[serde(rename = "response.output_text.delta")]
159    OutputTextDelta {
160        item_id: String,
161        output_index: usize,
162        #[serde(default)]
163        content_index: Option<usize>,
164        delta: String,
165    },
166    #[serde(rename = "response.output_text.done")]
167    OutputTextDone {
168        item_id: String,
169        output_index: usize,
170        #[serde(default)]
171        content_index: Option<usize>,
172        text: String,
173    },
174    #[serde(rename = "response.reasoning_summary_part.added")]
175    ReasoningSummaryPartAdded {
176        item_id: String,
177        output_index: usize,
178        summary_index: usize,
179    },
180    #[serde(rename = "response.reasoning_summary_text.delta")]
181    ReasoningSummaryTextDelta {
182        item_id: String,
183        output_index: usize,
184        delta: String,
185    },
186    #[serde(rename = "response.reasoning_summary_text.done")]
187    ReasoningSummaryTextDone {
188        item_id: String,
189        output_index: usize,
190        text: String,
191    },
192    #[serde(rename = "response.reasoning_summary_part.done")]
193    ReasoningSummaryPartDone {
194        item_id: String,
195        output_index: usize,
196        summary_index: usize,
197    },
198    #[serde(rename = "response.function_call_arguments.delta")]
199    FunctionCallArgumentsDelta {
200        item_id: String,
201        output_index: usize,
202        delta: String,
203        #[serde(default)]
204        sequence_number: Option<u64>,
205    },
206    #[serde(rename = "response.function_call_arguments.done")]
207    FunctionCallArgumentsDone {
208        item_id: String,
209        output_index: usize,
210        arguments: String,
211        #[serde(default)]
212        sequence_number: Option<u64>,
213    },
214    #[serde(rename = "response.completed")]
215    Completed { response: ResponseSummary },
216    #[serde(rename = "response.incomplete")]
217    Incomplete { response: ResponseSummary },
218    #[serde(rename = "response.failed")]
219    Failed { response: ResponseSummary },
220    #[serde(rename = "response.error")]
221    Error { error: Error },
222    #[serde(rename = "error")]
223    GenericError { error: Error },
224    #[serde(other)]
225    Unknown,
226}
227
228#[derive(Deserialize, Debug, Default, Clone)]
229pub struct ResponseSummary {
230    #[serde(default)]
231    pub id: Option<String>,
232    #[serde(default)]
233    pub status: Option<String>,
234    #[serde(default)]
235    pub status_details: Option<ResponseStatusDetails>,
236    #[serde(default)]
237    pub usage: Option<ResponseUsage>,
238    #[serde(default)]
239    pub output: Vec<ResponseOutputItem>,
240}
241
242#[derive(Deserialize, Debug, Default, Clone)]
243pub struct ResponseStatusDetails {
244    #[serde(default)]
245    pub reason: Option<String>,
246    #[serde(default)]
247    pub r#type: Option<String>,
248    #[serde(default)]
249    pub error: Option<Value>,
250}
251
252#[derive(Deserialize, Debug, Default, Clone)]
253pub struct ResponseUsage {
254    #[serde(default)]
255    pub input_tokens: Option<u64>,
256    #[serde(default)]
257    pub output_tokens: Option<u64>,
258    #[serde(default)]
259    pub total_tokens: Option<u64>,
260}
261
262#[derive(Deserialize, Debug, Clone)]
263#[serde(tag = "type", rename_all = "snake_case")]
264pub enum ResponseOutputItem {
265    Message(ResponseOutputMessage),
266    FunctionCall(ResponseFunctionToolCall),
267    Reasoning(ResponseReasoningItem),
268    #[serde(other)]
269    Unknown,
270}
271
272#[derive(Deserialize, Debug, Clone)]
273pub struct ResponseReasoningItem {
274    #[serde(default)]
275    pub id: Option<String>,
276    #[serde(default)]
277    pub summary: Vec<ReasoningSummaryPart>,
278}
279
280#[derive(Deserialize, Debug, Clone)]
281#[serde(tag = "type", rename_all = "snake_case")]
282pub enum ReasoningSummaryPart {
283    SummaryText {
284        text: String,
285    },
286    #[serde(other)]
287    Unknown,
288}
289
290#[derive(Deserialize, Debug, Clone)]
291pub struct ResponseOutputMessage {
292    #[serde(default)]
293    pub id: Option<String>,
294    #[serde(default)]
295    pub content: Vec<Value>,
296    #[serde(default)]
297    pub role: Option<String>,
298    #[serde(default)]
299    pub status: Option<String>,
300}
301
302#[derive(Deserialize, Debug, Clone)]
303pub struct ResponseFunctionToolCall {
304    #[serde(default)]
305    pub id: Option<String>,
306    #[serde(default)]
307    pub arguments: String,
308    #[serde(default)]
309    pub call_id: Option<String>,
310    #[serde(default)]
311    pub name: Option<String>,
312    #[serde(default)]
313    pub status: Option<String>,
314}
315
316pub async fn stream_response(
317    client: &dyn HttpClient,
318    provider_name: &str,
319    api_url: &str,
320    api_key: &str,
321    request: Request,
322    extra_headers: Vec<(String, String)>,
323) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
324    let uri = format!("{api_url}/responses");
325    let mut request_builder = HttpRequest::builder()
326        .method(Method::POST)
327        .uri(uri)
328        .header("Content-Type", "application/json")
329        .header("Authorization", format!("Bearer {}", api_key.trim()));
330    for (name, value) in &extra_headers {
331        request_builder = request_builder.header(name.as_str(), value.as_str());
332    }
333
334    let is_streaming = request.stream;
335    let request = request_builder
336        .body(AsyncBody::from(
337            serde_json::to_string(&request).map_err(|e| RequestError::Other(e.into()))?,
338        ))
339        .map_err(|e| RequestError::Other(e.into()))?;
340
341    let mut response = client.send(request).await?;
342    if response.status().is_success() {
343        if is_streaming {
344            let reader = BufReader::new(response.into_body());
345            Ok(reader
346                .lines()
347                .filter_map(|line| async move {
348                    match line {
349                        Ok(line) => {
350                            let line = line
351                                .strip_prefix("data: ")
352                                .or_else(|| line.strip_prefix("data:"))?;
353                            if line == "[DONE]" || line.is_empty() {
354                                None
355                            } else {
356                                match serde_json::from_str::<StreamEvent>(line) {
357                                    Ok(event) => Some(Ok(event)),
358                                    Err(error) => {
359                                        log::error!(
360                                            "Failed to parse OpenAI responses stream event: `{}`\nResponse: `{}`",
361                                            error,
362                                            line,
363                                        );
364                                        Some(Err(anyhow!(error)))
365                                    }
366                                }
367                            }
368                        }
369                        Err(error) => Some(Err(anyhow!(error))),
370                    }
371                })
372                .boxed())
373        } else {
374            let mut body = String::new();
375            response
376                .body_mut()
377                .read_to_string(&mut body)
378                .await
379                .map_err(|e| RequestError::Other(e.into()))?;
380
381            match serde_json::from_str::<ResponseSummary>(&body) {
382                Ok(response_summary) => {
383                    let events = vec![
384                        StreamEvent::Created {
385                            response: response_summary.clone(),
386                        },
387                        StreamEvent::InProgress {
388                            response: response_summary.clone(),
389                        },
390                    ];
391
392                    let mut all_events = events;
393                    for (output_index, item) in response_summary.output.iter().enumerate() {
394                        all_events.push(StreamEvent::OutputItemAdded {
395                            output_index,
396                            sequence_number: None,
397                            item: item.clone(),
398                        });
399
400                        match item {
401                            ResponseOutputItem::Message(message) => {
402                                for content_item in &message.content {
403                                    if let Some(text) = content_item.get("text") {
404                                        if let Some(text_str) = text.as_str() {
405                                            if let Some(ref item_id) = message.id {
406                                                all_events.push(StreamEvent::OutputTextDelta {
407                                                    item_id: item_id.clone(),
408                                                    output_index,
409                                                    content_index: None,
410                                                    delta: text_str.to_string(),
411                                                });
412                                            }
413                                        }
414                                    }
415                                }
416                            }
417                            ResponseOutputItem::FunctionCall(function_call) => {
418                                if let Some(ref item_id) = function_call.id {
419                                    all_events.push(StreamEvent::FunctionCallArgumentsDone {
420                                        item_id: item_id.clone(),
421                                        output_index,
422                                        arguments: function_call.arguments.clone(),
423                                        sequence_number: None,
424                                    });
425                                }
426                            }
427                            ResponseOutputItem::Reasoning(reasoning) => {
428                                if let Some(ref item_id) = reasoning.id {
429                                    for part in &reasoning.summary {
430                                        if let ReasoningSummaryPart::SummaryText { text } = part {
431                                            all_events.push(
432                                                StreamEvent::ReasoningSummaryTextDelta {
433                                                    item_id: item_id.clone(),
434                                                    output_index,
435                                                    delta: text.clone(),
436                                                },
437                                            );
438                                        }
439                                    }
440                                }
441                            }
442                            ResponseOutputItem::Unknown => {}
443                        }
444
445                        all_events.push(StreamEvent::OutputItemDone {
446                            output_index,
447                            sequence_number: None,
448                            item: item.clone(),
449                        });
450                    }
451
452                    all_events.push(StreamEvent::Completed {
453                        response: response_summary,
454                    });
455
456                    Ok(futures::stream::iter(all_events.into_iter().map(Ok)).boxed())
457                }
458                Err(error) => {
459                    log::error!(
460                        "Failed to parse OpenAI non-streaming response: `{}`\nResponse: `{}`",
461                        error,
462                        body,
463                    );
464                    Err(RequestError::Other(anyhow!(error)))
465                }
466            }
467        }
468    } else {
469        let mut body = String::new();
470        response
471            .body_mut()
472            .read_to_string(&mut body)
473            .await
474            .map_err(|e| RequestError::Other(e.into()))?;
475
476        Err(RequestError::HttpResponseError {
477            provider: provider_name.to_owned(),
478            status_code: response.status(),
479            body,
480            headers: response.headers().clone(),
481        })
482    }
483}