1use anyhow::{Result, anyhow};
2use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
3use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::{ReasoningEffort, RequestError, Role, ToolChoice};
8
9#[derive(Serialize, Debug)]
10pub struct Request {
11 pub model: String,
12 #[serde(skip_serializing_if = "Vec::is_empty")]
13 pub input: Vec<ResponseInputItem>,
14 #[serde(default)]
15 pub stream: bool,
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub temperature: Option<f32>,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 pub top_p: Option<f32>,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub max_output_tokens: Option<u64>,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub parallel_tool_calls: Option<bool>,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub tool_choice: Option<ToolChoice>,
26 #[serde(skip_serializing_if = "Vec::is_empty")]
27 pub tools: Vec<ToolDefinition>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub prompt_cache_key: Option<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub reasoning: Option<ReasoningConfig>,
32}
33
34#[derive(Debug, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum ResponseInputItem {
37 Message(ResponseMessageItem),
38 FunctionCall(ResponseFunctionCallItem),
39 FunctionCallOutput(ResponseFunctionCallOutputItem),
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct ResponseMessageItem {
44 pub role: Role,
45 pub content: Vec<ResponseInputContent>,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
49pub struct ResponseFunctionCallItem {
50 pub call_id: String,
51 pub name: String,
52 pub arguments: String,
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56pub struct ResponseFunctionCallOutputItem {
57 pub call_id: String,
58 pub output: String,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type")]
63pub enum ResponseInputContent {
64 #[serde(rename = "input_text")]
65 Text { text: String },
66 #[serde(rename = "input_image")]
67 Image { image_url: String },
68 #[serde(rename = "output_text")]
69 OutputText {
70 text: String,
71 #[serde(default)]
72 annotations: Vec<serde_json::Value>,
73 },
74 #[serde(rename = "refusal")]
75 Refusal { refusal: String },
76}
77
78#[derive(Serialize, Debug)]
79pub struct ReasoningConfig {
80 pub effort: ReasoningEffort,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub summary: Option<ReasoningSummaryMode>,
83}
84
85#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
86#[serde(rename_all = "lowercase")]
87pub enum ReasoningSummaryMode {
88 Auto,
89 Concise,
90 Detailed,
91}
92
93#[derive(Serialize, Debug)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum ToolDefinition {
96 Function {
97 name: String,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 description: Option<String>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 parameters: Option<Value>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 strict: Option<bool>,
104 },
105}
106
107#[derive(Deserialize, Debug)]
108pub struct Error {
109 pub message: String,
110}
111
112#[derive(Deserialize, Debug)]
113#[serde(tag = "type")]
114pub enum StreamEvent {
115 #[serde(rename = "response.created")]
116 Created { response: ResponseSummary },
117 #[serde(rename = "response.in_progress")]
118 InProgress { response: ResponseSummary },
119 #[serde(rename = "response.output_item.added")]
120 OutputItemAdded {
121 output_index: usize,
122 #[serde(default)]
123 sequence_number: Option<u64>,
124 item: ResponseOutputItem,
125 },
126 #[serde(rename = "response.output_item.done")]
127 OutputItemDone {
128 output_index: usize,
129 #[serde(default)]
130 sequence_number: Option<u64>,
131 item: ResponseOutputItem,
132 },
133 #[serde(rename = "response.content_part.added")]
134 ContentPartAdded {
135 item_id: String,
136 output_index: usize,
137 content_index: usize,
138 part: Value,
139 },
140 #[serde(rename = "response.content_part.done")]
141 ContentPartDone {
142 item_id: String,
143 output_index: usize,
144 content_index: usize,
145 part: Value,
146 },
147 #[serde(rename = "response.output_text.delta")]
148 OutputTextDelta {
149 item_id: String,
150 output_index: usize,
151 #[serde(default)]
152 content_index: Option<usize>,
153 delta: String,
154 },
155 #[serde(rename = "response.output_text.done")]
156 OutputTextDone {
157 item_id: String,
158 output_index: usize,
159 #[serde(default)]
160 content_index: Option<usize>,
161 text: String,
162 },
163 #[serde(rename = "response.reasoning_summary_part.added")]
164 ReasoningSummaryPartAdded {
165 item_id: String,
166 output_index: usize,
167 summary_index: usize,
168 },
169 #[serde(rename = "response.reasoning_summary_text.delta")]
170 ReasoningSummaryTextDelta {
171 item_id: String,
172 output_index: usize,
173 delta: String,
174 },
175 #[serde(rename = "response.reasoning_summary_text.done")]
176 ReasoningSummaryTextDone {
177 item_id: String,
178 output_index: usize,
179 text: String,
180 },
181 #[serde(rename = "response.reasoning_summary_part.done")]
182 ReasoningSummaryPartDone {
183 item_id: String,
184 output_index: usize,
185 summary_index: usize,
186 },
187 #[serde(rename = "response.function_call_arguments.delta")]
188 FunctionCallArgumentsDelta {
189 item_id: String,
190 output_index: usize,
191 delta: String,
192 #[serde(default)]
193 sequence_number: Option<u64>,
194 },
195 #[serde(rename = "response.function_call_arguments.done")]
196 FunctionCallArgumentsDone {
197 item_id: String,
198 output_index: usize,
199 arguments: String,
200 #[serde(default)]
201 sequence_number: Option<u64>,
202 },
203 #[serde(rename = "response.completed")]
204 Completed { response: ResponseSummary },
205 #[serde(rename = "response.incomplete")]
206 Incomplete { response: ResponseSummary },
207 #[serde(rename = "response.failed")]
208 Failed { response: ResponseSummary },
209 #[serde(rename = "response.error")]
210 Error { error: Error },
211 #[serde(rename = "error")]
212 GenericError { error: Error },
213 #[serde(other)]
214 Unknown,
215}
216
217#[derive(Deserialize, Debug, Default, Clone)]
218pub struct ResponseSummary {
219 #[serde(default)]
220 pub id: Option<String>,
221 #[serde(default)]
222 pub status: Option<String>,
223 #[serde(default)]
224 pub status_details: Option<ResponseStatusDetails>,
225 #[serde(default)]
226 pub usage: Option<ResponseUsage>,
227 #[serde(default)]
228 pub output: Vec<ResponseOutputItem>,
229}
230
231#[derive(Deserialize, Debug, Default, Clone)]
232pub struct ResponseStatusDetails {
233 #[serde(default)]
234 pub reason: Option<String>,
235 #[serde(default)]
236 pub r#type: Option<String>,
237 #[serde(default)]
238 pub error: Option<Value>,
239}
240
241#[derive(Deserialize, Debug, Default, Clone)]
242pub struct ResponseUsage {
243 #[serde(default)]
244 pub input_tokens: Option<u64>,
245 #[serde(default)]
246 pub output_tokens: Option<u64>,
247 #[serde(default)]
248 pub total_tokens: Option<u64>,
249}
250
251#[derive(Deserialize, Debug, Clone)]
252#[serde(tag = "type", rename_all = "snake_case")]
253pub enum ResponseOutputItem {
254 Message(ResponseOutputMessage),
255 FunctionCall(ResponseFunctionToolCall),
256 Reasoning(ResponseReasoningItem),
257 #[serde(other)]
258 Unknown,
259}
260
261#[derive(Deserialize, Debug, Clone)]
262pub struct ResponseReasoningItem {
263 #[serde(default)]
264 pub id: Option<String>,
265 #[serde(default)]
266 pub summary: Vec<ReasoningSummaryPart>,
267}
268
269#[derive(Deserialize, Debug, Clone)]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum ReasoningSummaryPart {
272 SummaryText {
273 text: String,
274 },
275 #[serde(other)]
276 Unknown,
277}
278
279#[derive(Deserialize, Debug, Clone)]
280pub struct ResponseOutputMessage {
281 #[serde(default)]
282 pub id: Option<String>,
283 #[serde(default)]
284 pub content: Vec<Value>,
285 #[serde(default)]
286 pub role: Option<String>,
287 #[serde(default)]
288 pub status: Option<String>,
289}
290
291#[derive(Deserialize, Debug, Clone)]
292pub struct ResponseFunctionToolCall {
293 #[serde(default)]
294 pub id: Option<String>,
295 #[serde(default)]
296 pub arguments: String,
297 #[serde(default)]
298 pub call_id: Option<String>,
299 #[serde(default)]
300 pub name: Option<String>,
301 #[serde(default)]
302 pub status: Option<String>,
303}
304
305pub async fn stream_response(
306 client: &dyn HttpClient,
307 provider_name: &str,
308 api_url: &str,
309 api_key: &str,
310 request: Request,
311) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
312 let uri = format!("{api_url}/responses");
313 let request_builder = HttpRequest::builder()
314 .method(Method::POST)
315 .uri(uri)
316 .header("Content-Type", "application/json")
317 .header("Authorization", format!("Bearer {}", api_key.trim()));
318
319 let is_streaming = request.stream;
320 let request = request_builder
321 .body(AsyncBody::from(
322 serde_json::to_string(&request).map_err(|e| RequestError::Other(e.into()))?,
323 ))
324 .map_err(|e| RequestError::Other(e.into()))?;
325
326 let mut response = client.send(request).await?;
327 if response.status().is_success() {
328 if is_streaming {
329 let reader = BufReader::new(response.into_body());
330 Ok(reader
331 .lines()
332 .filter_map(|line| async move {
333 match line {
334 Ok(line) => {
335 let line = line
336 .strip_prefix("data: ")
337 .or_else(|| line.strip_prefix("data:"))?;
338 if line == "[DONE]" || line.is_empty() {
339 None
340 } else {
341 match serde_json::from_str::<StreamEvent>(line) {
342 Ok(event) => Some(Ok(event)),
343 Err(error) => {
344 log::error!(
345 "Failed to parse OpenAI responses stream event: `{}`\nResponse: `{}`",
346 error,
347 line,
348 );
349 Some(Err(anyhow!(error)))
350 }
351 }
352 }
353 }
354 Err(error) => Some(Err(anyhow!(error))),
355 }
356 })
357 .boxed())
358 } else {
359 let mut body = String::new();
360 response
361 .body_mut()
362 .read_to_string(&mut body)
363 .await
364 .map_err(|e| RequestError::Other(e.into()))?;
365
366 match serde_json::from_str::<ResponseSummary>(&body) {
367 Ok(response_summary) => {
368 let events = vec![
369 StreamEvent::Created {
370 response: response_summary.clone(),
371 },
372 StreamEvent::InProgress {
373 response: response_summary.clone(),
374 },
375 ];
376
377 let mut all_events = events;
378 for (output_index, item) in response_summary.output.iter().enumerate() {
379 all_events.push(StreamEvent::OutputItemAdded {
380 output_index,
381 sequence_number: None,
382 item: item.clone(),
383 });
384
385 match item {
386 ResponseOutputItem::Message(message) => {
387 for content_item in &message.content {
388 if let Some(text) = content_item.get("text") {
389 if let Some(text_str) = text.as_str() {
390 if let Some(ref item_id) = message.id {
391 all_events.push(StreamEvent::OutputTextDelta {
392 item_id: item_id.clone(),
393 output_index,
394 content_index: None,
395 delta: text_str.to_string(),
396 });
397 }
398 }
399 }
400 }
401 }
402 ResponseOutputItem::FunctionCall(function_call) => {
403 if let Some(ref item_id) = function_call.id {
404 all_events.push(StreamEvent::FunctionCallArgumentsDone {
405 item_id: item_id.clone(),
406 output_index,
407 arguments: function_call.arguments.clone(),
408 sequence_number: None,
409 });
410 }
411 }
412 ResponseOutputItem::Reasoning(reasoning) => {
413 if let Some(ref item_id) = reasoning.id {
414 for part in &reasoning.summary {
415 if let ReasoningSummaryPart::SummaryText { text } = part {
416 all_events.push(
417 StreamEvent::ReasoningSummaryTextDelta {
418 item_id: item_id.clone(),
419 output_index,
420 delta: text.clone(),
421 },
422 );
423 }
424 }
425 }
426 }
427 ResponseOutputItem::Unknown => {}
428 }
429
430 all_events.push(StreamEvent::OutputItemDone {
431 output_index,
432 sequence_number: None,
433 item: item.clone(),
434 });
435 }
436
437 all_events.push(StreamEvent::Completed {
438 response: response_summary,
439 });
440
441 Ok(futures::stream::iter(all_events.into_iter().map(Ok)).boxed())
442 }
443 Err(error) => {
444 log::error!(
445 "Failed to parse OpenAI non-streaming response: `{}`\nResponse: `{}`",
446 error,
447 body,
448 );
449 Err(RequestError::Other(anyhow!(error)))
450 }
451 }
452 }
453 } else {
454 let mut body = String::new();
455 response
456 .body_mut()
457 .read_to_string(&mut body)
458 .await
459 .map_err(|e| RequestError::Other(e.into()))?;
460
461 Err(RequestError::HttpResponseError {
462 provider: provider_name.to_owned(),
463 status_code: response.status(),
464 body,
465 headers: response.headers().clone(),
466 })
467 }
468}