1use anyhow::{Result, anyhow};
2use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
3use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::{ReasoningEffort, RequestError, Role, ToolChoice};
8
9#[derive(Serialize, Debug)]
10pub struct Request {
11 pub model: String,
12 #[serde(skip_serializing_if = "Option::is_none")]
13 pub instructions: Option<String>,
14 #[serde(skip_serializing_if = "Vec::is_empty")]
15 pub input: Vec<ResponseInputItem>,
16 #[serde(default)]
17 pub stream: bool,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 pub temperature: Option<f32>,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub top_p: Option<f32>,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub max_output_tokens: Option<u64>,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub parallel_tool_calls: Option<bool>,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub tool_choice: Option<ToolChoice>,
28 #[serde(skip_serializing_if = "Vec::is_empty")]
29 pub tools: Vec<ToolDefinition>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub prompt_cache_key: Option<String>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub reasoning: Option<ReasoningConfig>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub store: Option<bool>,
36}
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum ResponseInputItem {
41 Message(ResponseMessageItem),
42 FunctionCall(ResponseFunctionCallItem),
43 FunctionCallOutput(ResponseFunctionCallOutputItem),
44}
45
46#[derive(Debug, Serialize, Deserialize)]
47pub struct ResponseMessageItem {
48 pub role: Role,
49 pub content: Vec<ResponseInputContent>,
50}
51
52#[derive(Debug, Serialize, Deserialize)]
53pub struct ResponseFunctionCallItem {
54 pub call_id: String,
55 pub name: String,
56 pub arguments: String,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct ResponseFunctionCallOutputItem {
61 pub call_id: String,
62 pub output: ResponseFunctionCallOutputContent,
63}
64
65#[derive(Debug, Serialize, Deserialize)]
66#[serde(untagged)]
67pub enum ResponseFunctionCallOutputContent {
68 List(Vec<ResponseInputContent>),
69 Text(String),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "type")]
74pub enum ResponseInputContent {
75 #[serde(rename = "input_text")]
76 Text { text: String },
77 #[serde(rename = "input_image")]
78 Image { image_url: String },
79 #[serde(rename = "output_text")]
80 OutputText {
81 text: String,
82 #[serde(default)]
83 annotations: Vec<serde_json::Value>,
84 },
85 #[serde(rename = "refusal")]
86 Refusal { refusal: String },
87}
88
89#[derive(Serialize, Debug)]
90pub struct ReasoningConfig {
91 pub effort: ReasoningEffort,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub summary: Option<ReasoningSummaryMode>,
94}
95
96#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
97#[serde(rename_all = "lowercase")]
98pub enum ReasoningSummaryMode {
99 Auto,
100 Concise,
101 Detailed,
102}
103
104#[derive(Serialize, Debug)]
105#[serde(tag = "type", rename_all = "snake_case")]
106pub enum ToolDefinition {
107 Function {
108 name: String,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 description: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 parameters: Option<Value>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 strict: Option<bool>,
115 },
116}
117
118#[derive(Deserialize, Debug)]
119pub struct Error {
120 pub message: String,
121}
122
123#[derive(Deserialize, Debug)]
124#[serde(tag = "type")]
125pub enum StreamEvent {
126 #[serde(rename = "response.created")]
127 Created { response: ResponseSummary },
128 #[serde(rename = "response.in_progress")]
129 InProgress { response: ResponseSummary },
130 #[serde(rename = "response.output_item.added")]
131 OutputItemAdded {
132 output_index: usize,
133 #[serde(default)]
134 sequence_number: Option<u64>,
135 item: ResponseOutputItem,
136 },
137 #[serde(rename = "response.output_item.done")]
138 OutputItemDone {
139 output_index: usize,
140 #[serde(default)]
141 sequence_number: Option<u64>,
142 item: ResponseOutputItem,
143 },
144 #[serde(rename = "response.content_part.added")]
145 ContentPartAdded {
146 item_id: String,
147 output_index: usize,
148 content_index: usize,
149 part: Value,
150 },
151 #[serde(rename = "response.content_part.done")]
152 ContentPartDone {
153 item_id: String,
154 output_index: usize,
155 content_index: usize,
156 part: Value,
157 },
158 #[serde(rename = "response.output_text.delta")]
159 OutputTextDelta {
160 item_id: String,
161 output_index: usize,
162 #[serde(default)]
163 content_index: Option<usize>,
164 delta: String,
165 },
166 #[serde(rename = "response.output_text.done")]
167 OutputTextDone {
168 item_id: String,
169 output_index: usize,
170 #[serde(default)]
171 content_index: Option<usize>,
172 text: String,
173 },
174 #[serde(rename = "response.reasoning_summary_part.added")]
175 ReasoningSummaryPartAdded {
176 item_id: String,
177 output_index: usize,
178 summary_index: usize,
179 },
180 #[serde(rename = "response.reasoning_summary_text.delta")]
181 ReasoningSummaryTextDelta {
182 item_id: String,
183 output_index: usize,
184 delta: String,
185 },
186 #[serde(rename = "response.reasoning_summary_text.done")]
187 ReasoningSummaryTextDone {
188 item_id: String,
189 output_index: usize,
190 text: String,
191 },
192 #[serde(rename = "response.reasoning_summary_part.done")]
193 ReasoningSummaryPartDone {
194 item_id: String,
195 output_index: usize,
196 summary_index: usize,
197 },
198 #[serde(rename = "response.function_call_arguments.delta")]
199 FunctionCallArgumentsDelta {
200 item_id: String,
201 output_index: usize,
202 delta: String,
203 #[serde(default)]
204 sequence_number: Option<u64>,
205 },
206 #[serde(rename = "response.function_call_arguments.done")]
207 FunctionCallArgumentsDone {
208 item_id: String,
209 output_index: usize,
210 arguments: String,
211 #[serde(default)]
212 sequence_number: Option<u64>,
213 },
214 #[serde(rename = "response.completed")]
215 Completed { response: ResponseSummary },
216 #[serde(rename = "response.incomplete")]
217 Incomplete { response: ResponseSummary },
218 #[serde(rename = "response.failed")]
219 Failed { response: ResponseSummary },
220 #[serde(rename = "response.error")]
221 Error { error: Error },
222 #[serde(rename = "error")]
223 GenericError { error: Error },
224 #[serde(other)]
225 Unknown,
226}
227
228#[derive(Deserialize, Debug, Default, Clone)]
229pub struct ResponseSummary {
230 #[serde(default)]
231 pub id: Option<String>,
232 #[serde(default)]
233 pub status: Option<String>,
234 #[serde(default)]
235 pub status_details: Option<ResponseStatusDetails>,
236 #[serde(default)]
237 pub usage: Option<ResponseUsage>,
238 #[serde(default)]
239 pub output: Vec<ResponseOutputItem>,
240}
241
242#[derive(Deserialize, Debug, Default, Clone)]
243pub struct ResponseStatusDetails {
244 #[serde(default)]
245 pub reason: Option<String>,
246 #[serde(default)]
247 pub r#type: Option<String>,
248 #[serde(default)]
249 pub error: Option<Value>,
250}
251
252#[derive(Deserialize, Debug, Default, Clone)]
253pub struct ResponseUsage {
254 #[serde(default)]
255 pub input_tokens: Option<u64>,
256 #[serde(default)]
257 pub output_tokens: Option<u64>,
258 #[serde(default)]
259 pub total_tokens: Option<u64>,
260}
261
262#[derive(Deserialize, Debug, Clone)]
263#[serde(tag = "type", rename_all = "snake_case")]
264pub enum ResponseOutputItem {
265 Message(ResponseOutputMessage),
266 FunctionCall(ResponseFunctionToolCall),
267 Reasoning(ResponseReasoningItem),
268 #[serde(other)]
269 Unknown,
270}
271
272#[derive(Deserialize, Debug, Clone)]
273pub struct ResponseReasoningItem {
274 #[serde(default)]
275 pub id: Option<String>,
276 #[serde(default)]
277 pub summary: Vec<ReasoningSummaryPart>,
278}
279
280#[derive(Deserialize, Debug, Clone)]
281#[serde(tag = "type", rename_all = "snake_case")]
282pub enum ReasoningSummaryPart {
283 SummaryText {
284 text: String,
285 },
286 #[serde(other)]
287 Unknown,
288}
289
290#[derive(Deserialize, Debug, Clone)]
291pub struct ResponseOutputMessage {
292 #[serde(default)]
293 pub id: Option<String>,
294 #[serde(default)]
295 pub content: Vec<Value>,
296 #[serde(default)]
297 pub role: Option<String>,
298 #[serde(default)]
299 pub status: Option<String>,
300}
301
302#[derive(Deserialize, Debug, Clone)]
303pub struct ResponseFunctionToolCall {
304 #[serde(default)]
305 pub id: Option<String>,
306 #[serde(default)]
307 pub arguments: String,
308 #[serde(default)]
309 pub call_id: Option<String>,
310 #[serde(default)]
311 pub name: Option<String>,
312 #[serde(default)]
313 pub status: Option<String>,
314}
315
316pub async fn stream_response(
317 client: &dyn HttpClient,
318 provider_name: &str,
319 api_url: &str,
320 api_key: &str,
321 request: Request,
322 extra_headers: Vec<(String, String)>,
323) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
324 let uri = format!("{api_url}/responses");
325 let mut request_builder = HttpRequest::builder()
326 .method(Method::POST)
327 .uri(uri)
328 .header("Content-Type", "application/json")
329 .header("Authorization", format!("Bearer {}", api_key.trim()));
330 for (name, value) in &extra_headers {
331 request_builder = request_builder.header(name.as_str(), value.as_str());
332 }
333
334 let is_streaming = request.stream;
335 let request = request_builder
336 .body(AsyncBody::from(
337 serde_json::to_string(&request).map_err(|e| RequestError::Other(e.into()))?,
338 ))
339 .map_err(|e| RequestError::Other(e.into()))?;
340
341 let mut response = client.send(request).await?;
342 if response.status().is_success() {
343 if is_streaming {
344 let reader = BufReader::new(response.into_body());
345 Ok(reader
346 .lines()
347 .filter_map(|line| async move {
348 match line {
349 Ok(line) => {
350 let line = line
351 .strip_prefix("data: ")
352 .or_else(|| line.strip_prefix("data:"))?;
353 if line == "[DONE]" || line.is_empty() {
354 None
355 } else {
356 match serde_json::from_str::<StreamEvent>(line) {
357 Ok(event) => Some(Ok(event)),
358 Err(error) => {
359 log::error!(
360 "Failed to parse OpenAI responses stream event: `{}`\nResponse: `{}`",
361 error,
362 line,
363 );
364 Some(Err(anyhow!(error)))
365 }
366 }
367 }
368 }
369 Err(error) => Some(Err(anyhow!(error))),
370 }
371 })
372 .boxed())
373 } else {
374 let mut body = String::new();
375 response
376 .body_mut()
377 .read_to_string(&mut body)
378 .await
379 .map_err(|e| RequestError::Other(e.into()))?;
380
381 match serde_json::from_str::<ResponseSummary>(&body) {
382 Ok(response_summary) => {
383 let events = vec![
384 StreamEvent::Created {
385 response: response_summary.clone(),
386 },
387 StreamEvent::InProgress {
388 response: response_summary.clone(),
389 },
390 ];
391
392 let mut all_events = events;
393 for (output_index, item) in response_summary.output.iter().enumerate() {
394 all_events.push(StreamEvent::OutputItemAdded {
395 output_index,
396 sequence_number: None,
397 item: item.clone(),
398 });
399
400 match item {
401 ResponseOutputItem::Message(message) => {
402 for content_item in &message.content {
403 if let Some(text) = content_item.get("text") {
404 if let Some(text_str) = text.as_str() {
405 if let Some(ref item_id) = message.id {
406 all_events.push(StreamEvent::OutputTextDelta {
407 item_id: item_id.clone(),
408 output_index,
409 content_index: None,
410 delta: text_str.to_string(),
411 });
412 }
413 }
414 }
415 }
416 }
417 ResponseOutputItem::FunctionCall(function_call) => {
418 if let Some(ref item_id) = function_call.id {
419 all_events.push(StreamEvent::FunctionCallArgumentsDone {
420 item_id: item_id.clone(),
421 output_index,
422 arguments: function_call.arguments.clone(),
423 sequence_number: None,
424 });
425 }
426 }
427 ResponseOutputItem::Reasoning(reasoning) => {
428 if let Some(ref item_id) = reasoning.id {
429 for part in &reasoning.summary {
430 if let ReasoningSummaryPart::SummaryText { text } = part {
431 all_events.push(
432 StreamEvent::ReasoningSummaryTextDelta {
433 item_id: item_id.clone(),
434 output_index,
435 delta: text.clone(),
436 },
437 );
438 }
439 }
440 }
441 }
442 ResponseOutputItem::Unknown => {}
443 }
444
445 all_events.push(StreamEvent::OutputItemDone {
446 output_index,
447 sequence_number: None,
448 item: item.clone(),
449 });
450 }
451
452 all_events.push(StreamEvent::Completed {
453 response: response_summary,
454 });
455
456 Ok(futures::stream::iter(all_events.into_iter().map(Ok)).boxed())
457 }
458 Err(error) => {
459 log::error!(
460 "Failed to parse OpenAI non-streaming response: `{}`\nResponse: `{}`",
461 error,
462 body,
463 );
464 Err(RequestError::Other(anyhow!(error)))
465 }
466 }
467 }
468 } else {
469 let mut body = String::new();
470 response
471 .body_mut()
472 .read_to_string(&mut body)
473 .await
474 .map_err(|e| RequestError::Other(e.into()))?;
475
476 Err(RequestError::HttpResponseError {
477 provider: provider_name.to_owned(),
478 status_code: response.status(),
479 body,
480 headers: response.headers().clone(),
481 })
482 }
483}