responses.rs

  1use anyhow::{Result, anyhow};
  2use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
  3use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
  4use serde::{Deserialize, Serialize};
  5use serde_json::Value;
  6
  7use crate::{ReasoningEffort, RequestError, Role, ToolChoice};
  8
  9#[derive(Serialize, Debug)]
 10pub struct Request {
 11    pub model: String,
 12    #[serde(skip_serializing_if = "Vec::is_empty")]
 13    pub input: Vec<ResponseInputItem>,
 14    #[serde(default)]
 15    pub stream: bool,
 16    #[serde(skip_serializing_if = "Option::is_none")]
 17    pub temperature: Option<f32>,
 18    #[serde(skip_serializing_if = "Option::is_none")]
 19    pub top_p: Option<f32>,
 20    #[serde(skip_serializing_if = "Option::is_none")]
 21    pub max_output_tokens: Option<u64>,
 22    #[serde(skip_serializing_if = "Option::is_none")]
 23    pub parallel_tool_calls: Option<bool>,
 24    #[serde(skip_serializing_if = "Option::is_none")]
 25    pub tool_choice: Option<ToolChoice>,
 26    #[serde(skip_serializing_if = "Vec::is_empty")]
 27    pub tools: Vec<ToolDefinition>,
 28    #[serde(skip_serializing_if = "Option::is_none")]
 29    pub prompt_cache_key: Option<String>,
 30    #[serde(skip_serializing_if = "Option::is_none")]
 31    pub reasoning: Option<ReasoningConfig>,
 32}
 33
 34#[derive(Debug, Serialize, Deserialize)]
 35#[serde(tag = "type", rename_all = "snake_case")]
 36pub enum ResponseInputItem {
 37    Message(ResponseMessageItem),
 38    FunctionCall(ResponseFunctionCallItem),
 39    FunctionCallOutput(ResponseFunctionCallOutputItem),
 40}
 41
 42#[derive(Debug, Serialize, Deserialize)]
 43pub struct ResponseMessageItem {
 44    pub role: Role,
 45    pub content: Vec<ResponseInputContent>,
 46}
 47
 48#[derive(Debug, Serialize, Deserialize)]
 49pub struct ResponseFunctionCallItem {
 50    pub call_id: String,
 51    pub name: String,
 52    pub arguments: String,
 53}
 54
 55#[derive(Debug, Serialize, Deserialize)]
 56pub struct ResponseFunctionCallOutputItem {
 57    pub call_id: String,
 58    pub output: String,
 59}
 60
 61#[derive(Debug, Clone, Serialize, Deserialize)]
 62#[serde(tag = "type")]
 63pub enum ResponseInputContent {
 64    #[serde(rename = "input_text")]
 65    Text { text: String },
 66    #[serde(rename = "input_image")]
 67    Image { image_url: String },
 68    #[serde(rename = "output_text")]
 69    OutputText {
 70        text: String,
 71        #[serde(default)]
 72        annotations: Vec<serde_json::Value>,
 73    },
 74    #[serde(rename = "refusal")]
 75    Refusal { refusal: String },
 76}
 77
 78#[derive(Serialize, Debug)]
 79pub struct ReasoningConfig {
 80    pub effort: ReasoningEffort,
 81    #[serde(skip_serializing_if = "Option::is_none")]
 82    pub summary: Option<ReasoningSummaryMode>,
 83}
 84
 85#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
 86#[serde(rename_all = "lowercase")]
 87pub enum ReasoningSummaryMode {
 88    Auto,
 89    Concise,
 90    Detailed,
 91}
 92
 93#[derive(Serialize, Debug)]
 94#[serde(tag = "type", rename_all = "snake_case")]
 95pub enum ToolDefinition {
 96    Function {
 97        name: String,
 98        #[serde(skip_serializing_if = "Option::is_none")]
 99        description: Option<String>,
100        #[serde(skip_serializing_if = "Option::is_none")]
101        parameters: Option<Value>,
102        #[serde(skip_serializing_if = "Option::is_none")]
103        strict: Option<bool>,
104    },
105}
106
107#[derive(Deserialize, Debug)]
108pub struct Error {
109    pub message: String,
110}
111
112#[derive(Deserialize, Debug)]
113#[serde(tag = "type")]
114pub enum StreamEvent {
115    #[serde(rename = "response.created")]
116    Created { response: ResponseSummary },
117    #[serde(rename = "response.in_progress")]
118    InProgress { response: ResponseSummary },
119    #[serde(rename = "response.output_item.added")]
120    OutputItemAdded {
121        output_index: usize,
122        #[serde(default)]
123        sequence_number: Option<u64>,
124        item: ResponseOutputItem,
125    },
126    #[serde(rename = "response.output_item.done")]
127    OutputItemDone {
128        output_index: usize,
129        #[serde(default)]
130        sequence_number: Option<u64>,
131        item: ResponseOutputItem,
132    },
133    #[serde(rename = "response.content_part.added")]
134    ContentPartAdded {
135        item_id: String,
136        output_index: usize,
137        content_index: usize,
138        part: Value,
139    },
140    #[serde(rename = "response.content_part.done")]
141    ContentPartDone {
142        item_id: String,
143        output_index: usize,
144        content_index: usize,
145        part: Value,
146    },
147    #[serde(rename = "response.output_text.delta")]
148    OutputTextDelta {
149        item_id: String,
150        output_index: usize,
151        #[serde(default)]
152        content_index: Option<usize>,
153        delta: String,
154    },
155    #[serde(rename = "response.output_text.done")]
156    OutputTextDone {
157        item_id: String,
158        output_index: usize,
159        #[serde(default)]
160        content_index: Option<usize>,
161        text: String,
162    },
163    #[serde(rename = "response.reasoning_summary_part.added")]
164    ReasoningSummaryPartAdded {
165        item_id: String,
166        output_index: usize,
167        summary_index: usize,
168    },
169    #[serde(rename = "response.reasoning_summary_text.delta")]
170    ReasoningSummaryTextDelta {
171        item_id: String,
172        output_index: usize,
173        delta: String,
174    },
175    #[serde(rename = "response.reasoning_summary_text.done")]
176    ReasoningSummaryTextDone {
177        item_id: String,
178        output_index: usize,
179        text: String,
180    },
181    #[serde(rename = "response.reasoning_summary_part.done")]
182    ReasoningSummaryPartDone {
183        item_id: String,
184        output_index: usize,
185        summary_index: usize,
186    },
187    #[serde(rename = "response.function_call_arguments.delta")]
188    FunctionCallArgumentsDelta {
189        item_id: String,
190        output_index: usize,
191        delta: String,
192        #[serde(default)]
193        sequence_number: Option<u64>,
194    },
195    #[serde(rename = "response.function_call_arguments.done")]
196    FunctionCallArgumentsDone {
197        item_id: String,
198        output_index: usize,
199        arguments: String,
200        #[serde(default)]
201        sequence_number: Option<u64>,
202    },
203    #[serde(rename = "response.completed")]
204    Completed { response: ResponseSummary },
205    #[serde(rename = "response.incomplete")]
206    Incomplete { response: ResponseSummary },
207    #[serde(rename = "response.failed")]
208    Failed { response: ResponseSummary },
209    #[serde(rename = "response.error")]
210    Error { error: Error },
211    #[serde(rename = "error")]
212    GenericError { error: Error },
213    #[serde(other)]
214    Unknown,
215}
216
217#[derive(Deserialize, Debug, Default, Clone)]
218pub struct ResponseSummary {
219    #[serde(default)]
220    pub id: Option<String>,
221    #[serde(default)]
222    pub status: Option<String>,
223    #[serde(default)]
224    pub status_details: Option<ResponseStatusDetails>,
225    #[serde(default)]
226    pub usage: Option<ResponseUsage>,
227    #[serde(default)]
228    pub output: Vec<ResponseOutputItem>,
229}
230
231#[derive(Deserialize, Debug, Default, Clone)]
232pub struct ResponseStatusDetails {
233    #[serde(default)]
234    pub reason: Option<String>,
235    #[serde(default)]
236    pub r#type: Option<String>,
237    #[serde(default)]
238    pub error: Option<Value>,
239}
240
241#[derive(Deserialize, Debug, Default, Clone)]
242pub struct ResponseUsage {
243    #[serde(default)]
244    pub input_tokens: Option<u64>,
245    #[serde(default)]
246    pub output_tokens: Option<u64>,
247    #[serde(default)]
248    pub total_tokens: Option<u64>,
249}
250
251#[derive(Deserialize, Debug, Clone)]
252#[serde(tag = "type", rename_all = "snake_case")]
253pub enum ResponseOutputItem {
254    Message(ResponseOutputMessage),
255    FunctionCall(ResponseFunctionToolCall),
256    Reasoning(ResponseReasoningItem),
257    #[serde(other)]
258    Unknown,
259}
260
261#[derive(Deserialize, Debug, Clone)]
262pub struct ResponseReasoningItem {
263    #[serde(default)]
264    pub id: Option<String>,
265    #[serde(default)]
266    pub summary: Vec<ReasoningSummaryPart>,
267}
268
269#[derive(Deserialize, Debug, Clone)]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum ReasoningSummaryPart {
272    SummaryText {
273        text: String,
274    },
275    #[serde(other)]
276    Unknown,
277}
278
279#[derive(Deserialize, Debug, Clone)]
280pub struct ResponseOutputMessage {
281    #[serde(default)]
282    pub id: Option<String>,
283    #[serde(default)]
284    pub content: Vec<Value>,
285    #[serde(default)]
286    pub role: Option<String>,
287    #[serde(default)]
288    pub status: Option<String>,
289}
290
291#[derive(Deserialize, Debug, Clone)]
292pub struct ResponseFunctionToolCall {
293    #[serde(default)]
294    pub id: Option<String>,
295    #[serde(default)]
296    pub arguments: String,
297    #[serde(default)]
298    pub call_id: Option<String>,
299    #[serde(default)]
300    pub name: Option<String>,
301    #[serde(default)]
302    pub status: Option<String>,
303}
304
305pub async fn stream_response(
306    client: &dyn HttpClient,
307    provider_name: &str,
308    api_url: &str,
309    api_key: &str,
310    request: Request,
311) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
312    let uri = format!("{api_url}/responses");
313    let request_builder = HttpRequest::builder()
314        .method(Method::POST)
315        .uri(uri)
316        .header("Content-Type", "application/json")
317        .header("Authorization", format!("Bearer {}", api_key.trim()));
318
319    let is_streaming = request.stream;
320    let request = request_builder
321        .body(AsyncBody::from(
322            serde_json::to_string(&request).map_err(|e| RequestError::Other(e.into()))?,
323        ))
324        .map_err(|e| RequestError::Other(e.into()))?;
325
326    let mut response = client.send(request).await?;
327    if response.status().is_success() {
328        if is_streaming {
329            let reader = BufReader::new(response.into_body());
330            Ok(reader
331                .lines()
332                .filter_map(|line| async move {
333                    match line {
334                        Ok(line) => {
335                            let line = line
336                                .strip_prefix("data: ")
337                                .or_else(|| line.strip_prefix("data:"))?;
338                            if line == "[DONE]" || line.is_empty() {
339                                None
340                            } else {
341                                match serde_json::from_str::<StreamEvent>(line) {
342                                    Ok(event) => Some(Ok(event)),
343                                    Err(error) => {
344                                        log::error!(
345                                            "Failed to parse OpenAI responses stream event: `{}`\nResponse: `{}`",
346                                            error,
347                                            line,
348                                        );
349                                        Some(Err(anyhow!(error)))
350                                    }
351                                }
352                            }
353                        }
354                        Err(error) => Some(Err(anyhow!(error))),
355                    }
356                })
357                .boxed())
358        } else {
359            let mut body = String::new();
360            response
361                .body_mut()
362                .read_to_string(&mut body)
363                .await
364                .map_err(|e| RequestError::Other(e.into()))?;
365
366            match serde_json::from_str::<ResponseSummary>(&body) {
367                Ok(response_summary) => {
368                    let events = vec![
369                        StreamEvent::Created {
370                            response: response_summary.clone(),
371                        },
372                        StreamEvent::InProgress {
373                            response: response_summary.clone(),
374                        },
375                    ];
376
377                    let mut all_events = events;
378                    for (output_index, item) in response_summary.output.iter().enumerate() {
379                        all_events.push(StreamEvent::OutputItemAdded {
380                            output_index,
381                            sequence_number: None,
382                            item: item.clone(),
383                        });
384
385                        match item {
386                            ResponseOutputItem::Message(message) => {
387                                for content_item in &message.content {
388                                    if let Some(text) = content_item.get("text") {
389                                        if let Some(text_str) = text.as_str() {
390                                            if let Some(ref item_id) = message.id {
391                                                all_events.push(StreamEvent::OutputTextDelta {
392                                                    item_id: item_id.clone(),
393                                                    output_index,
394                                                    content_index: None,
395                                                    delta: text_str.to_string(),
396                                                });
397                                            }
398                                        }
399                                    }
400                                }
401                            }
402                            ResponseOutputItem::FunctionCall(function_call) => {
403                                if let Some(ref item_id) = function_call.id {
404                                    all_events.push(StreamEvent::FunctionCallArgumentsDone {
405                                        item_id: item_id.clone(),
406                                        output_index,
407                                        arguments: function_call.arguments.clone(),
408                                        sequence_number: None,
409                                    });
410                                }
411                            }
412                            ResponseOutputItem::Reasoning(reasoning) => {
413                                if let Some(ref item_id) = reasoning.id {
414                                    for part in &reasoning.summary {
415                                        if let ReasoningSummaryPart::SummaryText { text } = part {
416                                            all_events.push(
417                                                StreamEvent::ReasoningSummaryTextDelta {
418                                                    item_id: item_id.clone(),
419                                                    output_index,
420                                                    delta: text.clone(),
421                                                },
422                                            );
423                                        }
424                                    }
425                                }
426                            }
427                            ResponseOutputItem::Unknown => {}
428                        }
429
430                        all_events.push(StreamEvent::OutputItemDone {
431                            output_index,
432                            sequence_number: None,
433                            item: item.clone(),
434                        });
435                    }
436
437                    all_events.push(StreamEvent::Completed {
438                        response: response_summary,
439                    });
440
441                    Ok(futures::stream::iter(all_events.into_iter().map(Ok)).boxed())
442                }
443                Err(error) => {
444                    log::error!(
445                        "Failed to parse OpenAI non-streaming response: `{}`\nResponse: `{}`",
446                        error,
447                        body,
448                    );
449                    Err(RequestError::Other(anyhow!(error)))
450                }
451            }
452        }
453    } else {
454        let mut body = String::new();
455        response
456            .body_mut()
457            .read_to_string(&mut body)
458            .await
459            .map_err(|e| RequestError::Other(e.into()))?;
460
461        Err(RequestError::HttpResponseError {
462            provider: provider_name.to_owned(),
463            status_code: response.status(),
464            body,
465            headers: response.headers().clone(),
466        })
467    }
468}