responses.rs

  1use anyhow::{Result, anyhow};
  2use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
  3use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
  4use serde::{Deserialize, Serialize};
  5use serde_json::Value;
  6
  7use crate::{ReasoningEffort, RequestError, Role, ToolChoice};
  8
  9#[derive(Serialize, Debug)]
 10pub struct Request {
 11    pub model: String,
 12    #[serde(skip_serializing_if = "Vec::is_empty")]
 13    pub input: Vec<ResponseInputItem>,
 14    #[serde(default)]
 15    pub stream: bool,
 16    #[serde(skip_serializing_if = "Option::is_none")]
 17    pub temperature: Option<f32>,
 18    #[serde(skip_serializing_if = "Option::is_none")]
 19    pub top_p: Option<f32>,
 20    #[serde(skip_serializing_if = "Option::is_none")]
 21    pub max_output_tokens: Option<u64>,
 22    #[serde(skip_serializing_if = "Option::is_none")]
 23    pub parallel_tool_calls: Option<bool>,
 24    #[serde(skip_serializing_if = "Option::is_none")]
 25    pub tool_choice: Option<ToolChoice>,
 26    #[serde(skip_serializing_if = "Vec::is_empty")]
 27    pub tools: Vec<ToolDefinition>,
 28    #[serde(skip_serializing_if = "Option::is_none")]
 29    pub prompt_cache_key: Option<String>,
 30    #[serde(skip_serializing_if = "Option::is_none")]
 31    pub reasoning: Option<ReasoningConfig>,
 32}
 33
 34#[derive(Debug, Serialize, Deserialize)]
 35#[serde(tag = "type", rename_all = "snake_case")]
 36pub enum ResponseInputItem {
 37    Message(ResponseMessageItem),
 38    FunctionCall(ResponseFunctionCallItem),
 39    FunctionCallOutput(ResponseFunctionCallOutputItem),
 40}
 41
 42#[derive(Debug, Serialize, Deserialize)]
 43pub struct ResponseMessageItem {
 44    pub role: Role,
 45    pub content: Vec<ResponseInputContent>,
 46}
 47
 48#[derive(Debug, Serialize, Deserialize)]
 49pub struct ResponseFunctionCallItem {
 50    pub call_id: String,
 51    pub name: String,
 52    pub arguments: String,
 53}
 54
 55#[derive(Debug, Serialize, Deserialize)]
 56pub struct ResponseFunctionCallOutputItem {
 57    pub call_id: String,
 58    pub output: ResponseFunctionCallOutputContent,
 59}
 60
 61#[derive(Debug, Serialize, Deserialize)]
 62#[serde(untagged)]
 63pub enum ResponseFunctionCallOutputContent {
 64    List(Vec<ResponseInputContent>),
 65    Text(String),
 66}
 67
 68#[derive(Debug, Clone, Serialize, Deserialize)]
 69#[serde(tag = "type")]
 70pub enum ResponseInputContent {
 71    #[serde(rename = "input_text")]
 72    Text { text: String },
 73    #[serde(rename = "input_image")]
 74    Image { image_url: String },
 75    #[serde(rename = "output_text")]
 76    OutputText {
 77        text: String,
 78        #[serde(default)]
 79        annotations: Vec<serde_json::Value>,
 80    },
 81    #[serde(rename = "refusal")]
 82    Refusal { refusal: String },
 83}
 84
 85#[derive(Serialize, Debug)]
 86pub struct ReasoningConfig {
 87    pub effort: ReasoningEffort,
 88    #[serde(skip_serializing_if = "Option::is_none")]
 89    pub summary: Option<ReasoningSummaryMode>,
 90}
 91
 92#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
 93#[serde(rename_all = "lowercase")]
 94pub enum ReasoningSummaryMode {
 95    Auto,
 96    Concise,
 97    Detailed,
 98}
 99
100#[derive(Serialize, Debug)]
101#[serde(tag = "type", rename_all = "snake_case")]
102pub enum ToolDefinition {
103    Function {
104        name: String,
105        #[serde(skip_serializing_if = "Option::is_none")]
106        description: Option<String>,
107        #[serde(skip_serializing_if = "Option::is_none")]
108        parameters: Option<Value>,
109        #[serde(skip_serializing_if = "Option::is_none")]
110        strict: Option<bool>,
111    },
112}
113
114#[derive(Deserialize, Debug)]
115pub struct Error {
116    pub message: String,
117}
118
119#[derive(Deserialize, Debug)]
120#[serde(tag = "type")]
121pub enum StreamEvent {
122    #[serde(rename = "response.created")]
123    Created { response: ResponseSummary },
124    #[serde(rename = "response.in_progress")]
125    InProgress { response: ResponseSummary },
126    #[serde(rename = "response.output_item.added")]
127    OutputItemAdded {
128        output_index: usize,
129        #[serde(default)]
130        sequence_number: Option<u64>,
131        item: ResponseOutputItem,
132    },
133    #[serde(rename = "response.output_item.done")]
134    OutputItemDone {
135        output_index: usize,
136        #[serde(default)]
137        sequence_number: Option<u64>,
138        item: ResponseOutputItem,
139    },
140    #[serde(rename = "response.content_part.added")]
141    ContentPartAdded {
142        item_id: String,
143        output_index: usize,
144        content_index: usize,
145        part: Value,
146    },
147    #[serde(rename = "response.content_part.done")]
148    ContentPartDone {
149        item_id: String,
150        output_index: usize,
151        content_index: usize,
152        part: Value,
153    },
154    #[serde(rename = "response.output_text.delta")]
155    OutputTextDelta {
156        item_id: String,
157        output_index: usize,
158        #[serde(default)]
159        content_index: Option<usize>,
160        delta: String,
161    },
162    #[serde(rename = "response.output_text.done")]
163    OutputTextDone {
164        item_id: String,
165        output_index: usize,
166        #[serde(default)]
167        content_index: Option<usize>,
168        text: String,
169    },
170    #[serde(rename = "response.reasoning_summary_part.added")]
171    ReasoningSummaryPartAdded {
172        item_id: String,
173        output_index: usize,
174        summary_index: usize,
175    },
176    #[serde(rename = "response.reasoning_summary_text.delta")]
177    ReasoningSummaryTextDelta {
178        item_id: String,
179        output_index: usize,
180        delta: String,
181    },
182    #[serde(rename = "response.reasoning_summary_text.done")]
183    ReasoningSummaryTextDone {
184        item_id: String,
185        output_index: usize,
186        text: String,
187    },
188    #[serde(rename = "response.reasoning_summary_part.done")]
189    ReasoningSummaryPartDone {
190        item_id: String,
191        output_index: usize,
192        summary_index: usize,
193    },
194    #[serde(rename = "response.function_call_arguments.delta")]
195    FunctionCallArgumentsDelta {
196        item_id: String,
197        output_index: usize,
198        delta: String,
199        #[serde(default)]
200        sequence_number: Option<u64>,
201    },
202    #[serde(rename = "response.function_call_arguments.done")]
203    FunctionCallArgumentsDone {
204        item_id: String,
205        output_index: usize,
206        arguments: String,
207        #[serde(default)]
208        sequence_number: Option<u64>,
209    },
210    #[serde(rename = "response.completed")]
211    Completed { response: ResponseSummary },
212    #[serde(rename = "response.incomplete")]
213    Incomplete { response: ResponseSummary },
214    #[serde(rename = "response.failed")]
215    Failed { response: ResponseSummary },
216    #[serde(rename = "response.error")]
217    Error { error: Error },
218    #[serde(rename = "error")]
219    GenericError { error: Error },
220    #[serde(other)]
221    Unknown,
222}
223
224#[derive(Deserialize, Debug, Default, Clone)]
225pub struct ResponseSummary {
226    #[serde(default)]
227    pub id: Option<String>,
228    #[serde(default)]
229    pub status: Option<String>,
230    #[serde(default)]
231    pub status_details: Option<ResponseStatusDetails>,
232    #[serde(default)]
233    pub usage: Option<ResponseUsage>,
234    #[serde(default)]
235    pub output: Vec<ResponseOutputItem>,
236}
237
238#[derive(Deserialize, Debug, Default, Clone)]
239pub struct ResponseStatusDetails {
240    #[serde(default)]
241    pub reason: Option<String>,
242    #[serde(default)]
243    pub r#type: Option<String>,
244    #[serde(default)]
245    pub error: Option<Value>,
246}
247
248#[derive(Deserialize, Debug, Default, Clone)]
249pub struct ResponseUsage {
250    #[serde(default)]
251    pub input_tokens: Option<u64>,
252    #[serde(default)]
253    pub output_tokens: Option<u64>,
254    #[serde(default)]
255    pub total_tokens: Option<u64>,
256}
257
258#[derive(Deserialize, Debug, Clone)]
259#[serde(tag = "type", rename_all = "snake_case")]
260pub enum ResponseOutputItem {
261    Message(ResponseOutputMessage),
262    FunctionCall(ResponseFunctionToolCall),
263    Reasoning(ResponseReasoningItem),
264    #[serde(other)]
265    Unknown,
266}
267
268#[derive(Deserialize, Debug, Clone)]
269pub struct ResponseReasoningItem {
270    #[serde(default)]
271    pub id: Option<String>,
272    #[serde(default)]
273    pub summary: Vec<ReasoningSummaryPart>,
274}
275
276#[derive(Deserialize, Debug, Clone)]
277#[serde(tag = "type", rename_all = "snake_case")]
278pub enum ReasoningSummaryPart {
279    SummaryText {
280        text: String,
281    },
282    #[serde(other)]
283    Unknown,
284}
285
286#[derive(Deserialize, Debug, Clone)]
287pub struct ResponseOutputMessage {
288    #[serde(default)]
289    pub id: Option<String>,
290    #[serde(default)]
291    pub content: Vec<Value>,
292    #[serde(default)]
293    pub role: Option<String>,
294    #[serde(default)]
295    pub status: Option<String>,
296}
297
298#[derive(Deserialize, Debug, Clone)]
299pub struct ResponseFunctionToolCall {
300    #[serde(default)]
301    pub id: Option<String>,
302    #[serde(default)]
303    pub arguments: String,
304    #[serde(default)]
305    pub call_id: Option<String>,
306    #[serde(default)]
307    pub name: Option<String>,
308    #[serde(default)]
309    pub status: Option<String>,
310}
311
312pub async fn stream_response(
313    client: &dyn HttpClient,
314    provider_name: &str,
315    api_url: &str,
316    api_key: &str,
317    request: Request,
318) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
319    let uri = format!("{api_url}/responses");
320    let request_builder = HttpRequest::builder()
321        .method(Method::POST)
322        .uri(uri)
323        .header("Content-Type", "application/json")
324        .header("Authorization", format!("Bearer {}", api_key.trim()));
325
326    let is_streaming = request.stream;
327    let request = request_builder
328        .body(AsyncBody::from(
329            serde_json::to_string(&request).map_err(|e| RequestError::Other(e.into()))?,
330        ))
331        .map_err(|e| RequestError::Other(e.into()))?;
332
333    let mut response = client.send(request).await?;
334    if response.status().is_success() {
335        if is_streaming {
336            let reader = BufReader::new(response.into_body());
337            Ok(reader
338                .lines()
339                .filter_map(|line| async move {
340                    match line {
341                        Ok(line) => {
342                            let line = line
343                                .strip_prefix("data: ")
344                                .or_else(|| line.strip_prefix("data:"))?;
345                            if line == "[DONE]" || line.is_empty() {
346                                None
347                            } else {
348                                match serde_json::from_str::<StreamEvent>(line) {
349                                    Ok(event) => Some(Ok(event)),
350                                    Err(error) => {
351                                        log::error!(
352                                            "Failed to parse OpenAI responses stream event: `{}`\nResponse: `{}`",
353                                            error,
354                                            line,
355                                        );
356                                        Some(Err(anyhow!(error)))
357                                    }
358                                }
359                            }
360                        }
361                        Err(error) => Some(Err(anyhow!(error))),
362                    }
363                })
364                .boxed())
365        } else {
366            let mut body = String::new();
367            response
368                .body_mut()
369                .read_to_string(&mut body)
370                .await
371                .map_err(|e| RequestError::Other(e.into()))?;
372
373            match serde_json::from_str::<ResponseSummary>(&body) {
374                Ok(response_summary) => {
375                    let events = vec![
376                        StreamEvent::Created {
377                            response: response_summary.clone(),
378                        },
379                        StreamEvent::InProgress {
380                            response: response_summary.clone(),
381                        },
382                    ];
383
384                    let mut all_events = events;
385                    for (output_index, item) in response_summary.output.iter().enumerate() {
386                        all_events.push(StreamEvent::OutputItemAdded {
387                            output_index,
388                            sequence_number: None,
389                            item: item.clone(),
390                        });
391
392                        match item {
393                            ResponseOutputItem::Message(message) => {
394                                for content_item in &message.content {
395                                    if let Some(text) = content_item.get("text") {
396                                        if let Some(text_str) = text.as_str() {
397                                            if let Some(ref item_id) = message.id {
398                                                all_events.push(StreamEvent::OutputTextDelta {
399                                                    item_id: item_id.clone(),
400                                                    output_index,
401                                                    content_index: None,
402                                                    delta: text_str.to_string(),
403                                                });
404                                            }
405                                        }
406                                    }
407                                }
408                            }
409                            ResponseOutputItem::FunctionCall(function_call) => {
410                                if let Some(ref item_id) = function_call.id {
411                                    all_events.push(StreamEvent::FunctionCallArgumentsDone {
412                                        item_id: item_id.clone(),
413                                        output_index,
414                                        arguments: function_call.arguments.clone(),
415                                        sequence_number: None,
416                                    });
417                                }
418                            }
419                            ResponseOutputItem::Reasoning(reasoning) => {
420                                if let Some(ref item_id) = reasoning.id {
421                                    for part in &reasoning.summary {
422                                        if let ReasoningSummaryPart::SummaryText { text } = part {
423                                            all_events.push(
424                                                StreamEvent::ReasoningSummaryTextDelta {
425                                                    item_id: item_id.clone(),
426                                                    output_index,
427                                                    delta: text.clone(),
428                                                },
429                                            );
430                                        }
431                                    }
432                                }
433                            }
434                            ResponseOutputItem::Unknown => {}
435                        }
436
437                        all_events.push(StreamEvent::OutputItemDone {
438                            output_index,
439                            sequence_number: None,
440                            item: item.clone(),
441                        });
442                    }
443
444                    all_events.push(StreamEvent::Completed {
445                        response: response_summary,
446                    });
447
448                    Ok(futures::stream::iter(all_events.into_iter().map(Ok)).boxed())
449                }
450                Err(error) => {
451                    log::error!(
452                        "Failed to parse OpenAI non-streaming response: `{}`\nResponse: `{}`",
453                        error,
454                        body,
455                    );
456                    Err(RequestError::Other(anyhow!(error)))
457                }
458            }
459        }
460    } else {
461        let mut body = String::new();
462        response
463            .body_mut()
464            .read_to_string(&mut body)
465            .await
466            .map_err(|e| RequestError::Other(e.into()))?;
467
468        Err(RequestError::HttpResponseError {
469            provider: provider_name.to_owned(),
470            status_code: response.status(),
471            body,
472            headers: response.headers().clone(),
473        })
474    }
475}