1use anyhow::{Result, anyhow};
2use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
3use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::{ReasoningEffort, RequestError, Role, ToolChoice};
8
9#[derive(Serialize, Debug)]
10pub struct Request {
11 pub model: String,
12 #[serde(skip_serializing_if = "Vec::is_empty")]
13 pub input: Vec<ResponseInputItem>,
14 #[serde(default)]
15 pub stream: bool,
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub temperature: Option<f32>,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 pub top_p: Option<f32>,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub max_output_tokens: Option<u64>,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub parallel_tool_calls: Option<bool>,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub tool_choice: Option<ToolChoice>,
26 #[serde(skip_serializing_if = "Vec::is_empty")]
27 pub tools: Vec<ToolDefinition>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub prompt_cache_key: Option<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub reasoning: Option<ReasoningConfig>,
32}
33
34#[derive(Debug, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum ResponseInputItem {
37 Message(ResponseMessageItem),
38 FunctionCall(ResponseFunctionCallItem),
39 FunctionCallOutput(ResponseFunctionCallOutputItem),
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct ResponseMessageItem {
44 pub role: Role,
45 pub content: Vec<ResponseInputContent>,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
49pub struct ResponseFunctionCallItem {
50 pub call_id: String,
51 pub name: String,
52 pub arguments: String,
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56pub struct ResponseFunctionCallOutputItem {
57 pub call_id: String,
58 pub output: ResponseFunctionCallOutputContent,
59}
60
61#[derive(Debug, Serialize, Deserialize)]
62#[serde(untagged)]
63pub enum ResponseFunctionCallOutputContent {
64 List(Vec<ResponseInputContent>),
65 Text(String),
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69#[serde(tag = "type")]
70pub enum ResponseInputContent {
71 #[serde(rename = "input_text")]
72 Text { text: String },
73 #[serde(rename = "input_image")]
74 Image { image_url: String },
75 #[serde(rename = "output_text")]
76 OutputText {
77 text: String,
78 #[serde(default)]
79 annotations: Vec<serde_json::Value>,
80 },
81 #[serde(rename = "refusal")]
82 Refusal { refusal: String },
83}
84
85#[derive(Serialize, Debug)]
86pub struct ReasoningConfig {
87 pub effort: ReasoningEffort,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub summary: Option<ReasoningSummaryMode>,
90}
91
92#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
93#[serde(rename_all = "lowercase")]
94pub enum ReasoningSummaryMode {
95 Auto,
96 Concise,
97 Detailed,
98}
99
100#[derive(Serialize, Debug)]
101#[serde(tag = "type", rename_all = "snake_case")]
102pub enum ToolDefinition {
103 Function {
104 name: String,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 description: Option<String>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 parameters: Option<Value>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 strict: Option<bool>,
111 },
112}
113
114#[derive(Deserialize, Debug)]
115pub struct Error {
116 pub message: String,
117}
118
119#[derive(Deserialize, Debug)]
120#[serde(tag = "type")]
121pub enum StreamEvent {
122 #[serde(rename = "response.created")]
123 Created { response: ResponseSummary },
124 #[serde(rename = "response.in_progress")]
125 InProgress { response: ResponseSummary },
126 #[serde(rename = "response.output_item.added")]
127 OutputItemAdded {
128 output_index: usize,
129 #[serde(default)]
130 sequence_number: Option<u64>,
131 item: ResponseOutputItem,
132 },
133 #[serde(rename = "response.output_item.done")]
134 OutputItemDone {
135 output_index: usize,
136 #[serde(default)]
137 sequence_number: Option<u64>,
138 item: ResponseOutputItem,
139 },
140 #[serde(rename = "response.content_part.added")]
141 ContentPartAdded {
142 item_id: String,
143 output_index: usize,
144 content_index: usize,
145 part: Value,
146 },
147 #[serde(rename = "response.content_part.done")]
148 ContentPartDone {
149 item_id: String,
150 output_index: usize,
151 content_index: usize,
152 part: Value,
153 },
154 #[serde(rename = "response.output_text.delta")]
155 OutputTextDelta {
156 item_id: String,
157 output_index: usize,
158 #[serde(default)]
159 content_index: Option<usize>,
160 delta: String,
161 },
162 #[serde(rename = "response.output_text.done")]
163 OutputTextDone {
164 item_id: String,
165 output_index: usize,
166 #[serde(default)]
167 content_index: Option<usize>,
168 text: String,
169 },
170 #[serde(rename = "response.reasoning_summary_part.added")]
171 ReasoningSummaryPartAdded {
172 item_id: String,
173 output_index: usize,
174 summary_index: usize,
175 },
176 #[serde(rename = "response.reasoning_summary_text.delta")]
177 ReasoningSummaryTextDelta {
178 item_id: String,
179 output_index: usize,
180 delta: String,
181 },
182 #[serde(rename = "response.reasoning_summary_text.done")]
183 ReasoningSummaryTextDone {
184 item_id: String,
185 output_index: usize,
186 text: String,
187 },
188 #[serde(rename = "response.reasoning_summary_part.done")]
189 ReasoningSummaryPartDone {
190 item_id: String,
191 output_index: usize,
192 summary_index: usize,
193 },
194 #[serde(rename = "response.function_call_arguments.delta")]
195 FunctionCallArgumentsDelta {
196 item_id: String,
197 output_index: usize,
198 delta: String,
199 #[serde(default)]
200 sequence_number: Option<u64>,
201 },
202 #[serde(rename = "response.function_call_arguments.done")]
203 FunctionCallArgumentsDone {
204 item_id: String,
205 output_index: usize,
206 arguments: String,
207 #[serde(default)]
208 sequence_number: Option<u64>,
209 },
210 #[serde(rename = "response.completed")]
211 Completed { response: ResponseSummary },
212 #[serde(rename = "response.incomplete")]
213 Incomplete { response: ResponseSummary },
214 #[serde(rename = "response.failed")]
215 Failed { response: ResponseSummary },
216 #[serde(rename = "response.error")]
217 Error { error: Error },
218 #[serde(rename = "error")]
219 GenericError { error: Error },
220 #[serde(other)]
221 Unknown,
222}
223
224#[derive(Deserialize, Debug, Default, Clone)]
225pub struct ResponseSummary {
226 #[serde(default)]
227 pub id: Option<String>,
228 #[serde(default)]
229 pub status: Option<String>,
230 #[serde(default)]
231 pub status_details: Option<ResponseStatusDetails>,
232 #[serde(default)]
233 pub usage: Option<ResponseUsage>,
234 #[serde(default)]
235 pub output: Vec<ResponseOutputItem>,
236}
237
238#[derive(Deserialize, Debug, Default, Clone)]
239pub struct ResponseStatusDetails {
240 #[serde(default)]
241 pub reason: Option<String>,
242 #[serde(default)]
243 pub r#type: Option<String>,
244 #[serde(default)]
245 pub error: Option<Value>,
246}
247
248#[derive(Deserialize, Debug, Default, Clone)]
249pub struct ResponseUsage {
250 #[serde(default)]
251 pub input_tokens: Option<u64>,
252 #[serde(default)]
253 pub output_tokens: Option<u64>,
254 #[serde(default)]
255 pub total_tokens: Option<u64>,
256}
257
258#[derive(Deserialize, Debug, Clone)]
259#[serde(tag = "type", rename_all = "snake_case")]
260pub enum ResponseOutputItem {
261 Message(ResponseOutputMessage),
262 FunctionCall(ResponseFunctionToolCall),
263 Reasoning(ResponseReasoningItem),
264 #[serde(other)]
265 Unknown,
266}
267
268#[derive(Deserialize, Debug, Clone)]
269pub struct ResponseReasoningItem {
270 #[serde(default)]
271 pub id: Option<String>,
272 #[serde(default)]
273 pub summary: Vec<ReasoningSummaryPart>,
274}
275
276#[derive(Deserialize, Debug, Clone)]
277#[serde(tag = "type", rename_all = "snake_case")]
278pub enum ReasoningSummaryPart {
279 SummaryText {
280 text: String,
281 },
282 #[serde(other)]
283 Unknown,
284}
285
286#[derive(Deserialize, Debug, Clone)]
287pub struct ResponseOutputMessage {
288 #[serde(default)]
289 pub id: Option<String>,
290 #[serde(default)]
291 pub content: Vec<Value>,
292 #[serde(default)]
293 pub role: Option<String>,
294 #[serde(default)]
295 pub status: Option<String>,
296}
297
298#[derive(Deserialize, Debug, Clone)]
299pub struct ResponseFunctionToolCall {
300 #[serde(default)]
301 pub id: Option<String>,
302 #[serde(default)]
303 pub arguments: String,
304 #[serde(default)]
305 pub call_id: Option<String>,
306 #[serde(default)]
307 pub name: Option<String>,
308 #[serde(default)]
309 pub status: Option<String>,
310}
311
312pub async fn stream_response(
313 client: &dyn HttpClient,
314 provider_name: &str,
315 api_url: &str,
316 api_key: &str,
317 request: Request,
318) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
319 let uri = format!("{api_url}/responses");
320 let request_builder = HttpRequest::builder()
321 .method(Method::POST)
322 .uri(uri)
323 .header("Content-Type", "application/json")
324 .header("Authorization", format!("Bearer {}", api_key.trim()));
325
326 let is_streaming = request.stream;
327 let request = request_builder
328 .body(AsyncBody::from(
329 serde_json::to_string(&request).map_err(|e| RequestError::Other(e.into()))?,
330 ))
331 .map_err(|e| RequestError::Other(e.into()))?;
332
333 let mut response = client.send(request).await?;
334 if response.status().is_success() {
335 if is_streaming {
336 let reader = BufReader::new(response.into_body());
337 Ok(reader
338 .lines()
339 .filter_map(|line| async move {
340 match line {
341 Ok(line) => {
342 let line = line
343 .strip_prefix("data: ")
344 .or_else(|| line.strip_prefix("data:"))?;
345 if line == "[DONE]" || line.is_empty() {
346 None
347 } else {
348 match serde_json::from_str::<StreamEvent>(line) {
349 Ok(event) => Some(Ok(event)),
350 Err(error) => {
351 log::error!(
352 "Failed to parse OpenAI responses stream event: `{}`\nResponse: `{}`",
353 error,
354 line,
355 );
356 Some(Err(anyhow!(error)))
357 }
358 }
359 }
360 }
361 Err(error) => Some(Err(anyhow!(error))),
362 }
363 })
364 .boxed())
365 } else {
366 let mut body = String::new();
367 response
368 .body_mut()
369 .read_to_string(&mut body)
370 .await
371 .map_err(|e| RequestError::Other(e.into()))?;
372
373 match serde_json::from_str::<ResponseSummary>(&body) {
374 Ok(response_summary) => {
375 let events = vec![
376 StreamEvent::Created {
377 response: response_summary.clone(),
378 },
379 StreamEvent::InProgress {
380 response: response_summary.clone(),
381 },
382 ];
383
384 let mut all_events = events;
385 for (output_index, item) in response_summary.output.iter().enumerate() {
386 all_events.push(StreamEvent::OutputItemAdded {
387 output_index,
388 sequence_number: None,
389 item: item.clone(),
390 });
391
392 match item {
393 ResponseOutputItem::Message(message) => {
394 for content_item in &message.content {
395 if let Some(text) = content_item.get("text") {
396 if let Some(text_str) = text.as_str() {
397 if let Some(ref item_id) = message.id {
398 all_events.push(StreamEvent::OutputTextDelta {
399 item_id: item_id.clone(),
400 output_index,
401 content_index: None,
402 delta: text_str.to_string(),
403 });
404 }
405 }
406 }
407 }
408 }
409 ResponseOutputItem::FunctionCall(function_call) => {
410 if let Some(ref item_id) = function_call.id {
411 all_events.push(StreamEvent::FunctionCallArgumentsDone {
412 item_id: item_id.clone(),
413 output_index,
414 arguments: function_call.arguments.clone(),
415 sequence_number: None,
416 });
417 }
418 }
419 ResponseOutputItem::Reasoning(reasoning) => {
420 if let Some(ref item_id) = reasoning.id {
421 for part in &reasoning.summary {
422 if let ReasoningSummaryPart::SummaryText { text } = part {
423 all_events.push(
424 StreamEvent::ReasoningSummaryTextDelta {
425 item_id: item_id.clone(),
426 output_index,
427 delta: text.clone(),
428 },
429 );
430 }
431 }
432 }
433 }
434 ResponseOutputItem::Unknown => {}
435 }
436
437 all_events.push(StreamEvent::OutputItemDone {
438 output_index,
439 sequence_number: None,
440 item: item.clone(),
441 });
442 }
443
444 all_events.push(StreamEvent::Completed {
445 response: response_summary,
446 });
447
448 Ok(futures::stream::iter(all_events.into_iter().map(Ok)).boxed())
449 }
450 Err(error) => {
451 log::error!(
452 "Failed to parse OpenAI non-streaming response: `{}`\nResponse: `{}`",
453 error,
454 body,
455 );
456 Err(RequestError::Other(anyhow!(error)))
457 }
458 }
459 }
460 } else {
461 let mut body = String::new();
462 response
463 .body_mut()
464 .read_to_string(&mut body)
465 .await
466 .map_err(|e| RequestError::Other(e.into()))?;
467
468 Err(RequestError::HttpResponseError {
469 provider: provider_name.to_owned(),
470 status_code: response.status(),
471 body,
472 headers: response.headers().clone(),
473 })
474 }
475}