completion.rs

   1use anyhow::{Result, anyhow};
   2use collections::HashMap;
   3use futures::{Stream, StreamExt};
   4use language_model_core::{
   5    LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelImage,
   6    LanguageModelRequest, LanguageModelRequestMessage, LanguageModelToolChoice,
   7    LanguageModelToolResultContent, LanguageModelToolUse, LanguageModelToolUseId, MessageContent,
   8    Role, StopReason, TokenUsage,
   9    util::{fix_streamed_json, parse_tool_arguments},
  10};
  11use std::pin::Pin;
  12use std::sync::Arc;
  13
  14use crate::responses::{
  15    Request as ResponseRequest, ResponseFunctionCallItem, ResponseFunctionCallOutputContent,
  16    ResponseFunctionCallOutputItem, ResponseInputContent, ResponseInputItem, ResponseMessageItem,
  17    ResponseOutputItem, ResponseSummary as ResponsesSummary, ResponseUsage as ResponsesUsage,
  18    StreamEvent as ResponsesStreamEvent,
  19};
  20use crate::{
  21    FunctionContent, FunctionDefinition, ImageUrl, MessagePart, Model, ReasoningEffort,
  22    ResponseStreamEvent, ToolCall, ToolCallContent,
  23};
  24
  25pub fn into_open_ai(
  26    request: LanguageModelRequest,
  27    model_id: &str,
  28    supports_parallel_tool_calls: bool,
  29    supports_prompt_cache_key: bool,
  30    max_output_tokens: Option<u64>,
  31    reasoning_effort: Option<ReasoningEffort>,
  32) -> crate::Request {
  33    let stream = !model_id.starts_with("o1-");
  34
  35    let mut messages = Vec::new();
  36    for message in request.messages {
  37        for content in message.content {
  38            match content {
  39                MessageContent::Text(text) | MessageContent::Thinking { text, .. } => {
  40                    let should_add = if message.role == Role::User {
  41                        // Including whitespace-only user messages can cause error with OpenAI compatible APIs
  42                        // See https://github.com/zed-industries/zed/issues/40097
  43                        !text.trim().is_empty()
  44                    } else {
  45                        !text.is_empty()
  46                    };
  47                    if should_add {
  48                        add_message_content_part(
  49                            MessagePart::Text { text },
  50                            message.role,
  51                            &mut messages,
  52                        );
  53                    }
  54                }
  55                MessageContent::RedactedThinking(_) => {}
  56                MessageContent::Image(image) => {
  57                    add_message_content_part(
  58                        MessagePart::Image {
  59                            image_url: ImageUrl {
  60                                url: image.to_base64_url(),
  61                                detail: None,
  62                            },
  63                        },
  64                        message.role,
  65                        &mut messages,
  66                    );
  67                }
  68                MessageContent::ToolUse(tool_use) => {
  69                    let tool_call = ToolCall {
  70                        id: tool_use.id.to_string(),
  71                        content: ToolCallContent::Function {
  72                            function: FunctionContent {
  73                                name: tool_use.name.to_string(),
  74                                arguments: serde_json::to_string(&tool_use.input)
  75                                    .unwrap_or_default(),
  76                            },
  77                        },
  78                    };
  79
  80                    if let Some(crate::RequestMessage::Assistant { tool_calls, .. }) =
  81                        messages.last_mut()
  82                    {
  83                        tool_calls.push(tool_call);
  84                    } else {
  85                        messages.push(crate::RequestMessage::Assistant {
  86                            content: None,
  87                            tool_calls: vec![tool_call],
  88                        });
  89                    }
  90                }
  91                MessageContent::ToolResult(tool_result) => {
  92                    let content = match &tool_result.content {
  93                        LanguageModelToolResultContent::Text(text) => {
  94                            vec![MessagePart::Text {
  95                                text: text.to_string(),
  96                            }]
  97                        }
  98                        LanguageModelToolResultContent::Image(image) => {
  99                            vec![MessagePart::Image {
 100                                image_url: ImageUrl {
 101                                    url: image.to_base64_url(),
 102                                    detail: None,
 103                                },
 104                            }]
 105                        }
 106                    };
 107
 108                    messages.push(crate::RequestMessage::Tool {
 109                        content: content.into(),
 110                        tool_call_id: tool_result.tool_use_id.to_string(),
 111                    });
 112                }
 113            }
 114        }
 115    }
 116
 117    crate::Request {
 118        model: model_id.into(),
 119        messages,
 120        stream,
 121        stream_options: if stream {
 122            Some(crate::StreamOptions::default())
 123        } else {
 124            None
 125        },
 126        stop: request.stop,
 127        temperature: request.temperature.or(Some(1.0)),
 128        max_completion_tokens: max_output_tokens,
 129        parallel_tool_calls: if supports_parallel_tool_calls && !request.tools.is_empty() {
 130            Some(supports_parallel_tool_calls)
 131        } else {
 132            None
 133        },
 134        prompt_cache_key: if supports_prompt_cache_key {
 135            request.thread_id
 136        } else {
 137            None
 138        },
 139        tools: request
 140            .tools
 141            .into_iter()
 142            .map(|tool| crate::ToolDefinition::Function {
 143                function: FunctionDefinition {
 144                    name: tool.name,
 145                    description: Some(tool.description),
 146                    parameters: Some(tool.input_schema),
 147                },
 148            })
 149            .collect(),
 150        tool_choice: request.tool_choice.map(|choice| match choice {
 151            LanguageModelToolChoice::Auto => crate::ToolChoice::Auto,
 152            LanguageModelToolChoice::Any => crate::ToolChoice::Required,
 153            LanguageModelToolChoice::None => crate::ToolChoice::None,
 154        }),
 155        reasoning_effort,
 156    }
 157}
 158
 159pub fn into_open_ai_response(
 160    request: LanguageModelRequest,
 161    model_id: &str,
 162    supports_parallel_tool_calls: bool,
 163    supports_prompt_cache_key: bool,
 164    max_output_tokens: Option<u64>,
 165    reasoning_effort: Option<ReasoningEffort>,
 166) -> ResponseRequest {
 167    let stream = !model_id.starts_with("o1-");
 168
 169    let LanguageModelRequest {
 170        thread_id,
 171        prompt_id: _,
 172        intent: _,
 173        messages,
 174        tools,
 175        tool_choice,
 176        stop: _,
 177        temperature,
 178        thinking_allowed: _,
 179        thinking_effort: _,
 180        speed: _,
 181    } = request;
 182
 183    let mut input_items = Vec::new();
 184    for (index, message) in messages.into_iter().enumerate() {
 185        append_message_to_response_items(message, index, &mut input_items);
 186    }
 187
 188    let tools: Vec<_> = tools
 189        .into_iter()
 190        .map(|tool| crate::responses::ToolDefinition::Function {
 191            name: tool.name,
 192            description: Some(tool.description),
 193            parameters: Some(tool.input_schema),
 194            strict: None,
 195        })
 196        .collect();
 197
 198    ResponseRequest {
 199        model: model_id.into(),
 200        instructions: None,
 201        input: input_items,
 202        stream,
 203        temperature,
 204        top_p: None,
 205        max_output_tokens,
 206        parallel_tool_calls: if tools.is_empty() {
 207            None
 208        } else {
 209            Some(supports_parallel_tool_calls)
 210        },
 211        tool_choice: tool_choice.map(|choice| match choice {
 212            LanguageModelToolChoice::Auto => crate::ToolChoice::Auto,
 213            LanguageModelToolChoice::Any => crate::ToolChoice::Required,
 214            LanguageModelToolChoice::None => crate::ToolChoice::None,
 215        }),
 216        tools,
 217        prompt_cache_key: if supports_prompt_cache_key {
 218            thread_id
 219        } else {
 220            None
 221        },
 222        reasoning: reasoning_effort.map(|effort| crate::responses::ReasoningConfig {
 223            effort,
 224            summary: Some(crate::responses::ReasoningSummaryMode::Auto),
 225        }),
 226        store: None,
 227    }
 228}
 229
 230fn append_message_to_response_items(
 231    message: LanguageModelRequestMessage,
 232    index: usize,
 233    input_items: &mut Vec<ResponseInputItem>,
 234) {
 235    let mut content_parts: Vec<ResponseInputContent> = Vec::new();
 236
 237    for content in message.content {
 238        match content {
 239            MessageContent::Text(text) => {
 240                push_response_text_part(&message.role, text, &mut content_parts);
 241            }
 242            MessageContent::Thinking { text, .. } => {
 243                push_response_text_part(&message.role, text, &mut content_parts);
 244            }
 245            MessageContent::RedactedThinking(_) => {}
 246            MessageContent::Image(image) => {
 247                push_response_image_part(&message.role, image, &mut content_parts);
 248            }
 249            MessageContent::ToolUse(tool_use) => {
 250                flush_response_parts(&message.role, index, &mut content_parts, input_items);
 251                let call_id = tool_use.id.to_string();
 252                input_items.push(ResponseInputItem::FunctionCall(ResponseFunctionCallItem {
 253                    call_id,
 254                    name: tool_use.name.to_string(),
 255                    arguments: tool_use.raw_input,
 256                }));
 257            }
 258            MessageContent::ToolResult(tool_result) => {
 259                flush_response_parts(&message.role, index, &mut content_parts, input_items);
 260                input_items.push(ResponseInputItem::FunctionCallOutput(
 261                    ResponseFunctionCallOutputItem {
 262                        call_id: tool_result.tool_use_id.to_string(),
 263                        output: match tool_result.content {
 264                            LanguageModelToolResultContent::Text(text) => {
 265                                ResponseFunctionCallOutputContent::Text(text.to_string())
 266                            }
 267                            LanguageModelToolResultContent::Image(image) => {
 268                                ResponseFunctionCallOutputContent::List(vec![
 269                                    ResponseInputContent::Image {
 270                                        image_url: image.to_base64_url(),
 271                                    },
 272                                ])
 273                            }
 274                        },
 275                    },
 276                ));
 277            }
 278        }
 279    }
 280
 281    flush_response_parts(&message.role, index, &mut content_parts, input_items);
 282}
 283
 284fn push_response_text_part(
 285    role: &Role,
 286    text: impl Into<String>,
 287    parts: &mut Vec<ResponseInputContent>,
 288) {
 289    let text = text.into();
 290    if text.trim().is_empty() {
 291        return;
 292    }
 293
 294    match role {
 295        Role::Assistant => parts.push(ResponseInputContent::OutputText {
 296            text,
 297            annotations: Vec::new(),
 298        }),
 299        _ => parts.push(ResponseInputContent::Text { text }),
 300    }
 301}
 302
 303fn push_response_image_part(
 304    role: &Role,
 305    image: LanguageModelImage,
 306    parts: &mut Vec<ResponseInputContent>,
 307) {
 308    match role {
 309        Role::Assistant => parts.push(ResponseInputContent::OutputText {
 310            text: "[image omitted]".to_string(),
 311            annotations: Vec::new(),
 312        }),
 313        _ => parts.push(ResponseInputContent::Image {
 314            image_url: image.to_base64_url(),
 315        }),
 316    }
 317}
 318
 319fn flush_response_parts(
 320    role: &Role,
 321    _index: usize,
 322    parts: &mut Vec<ResponseInputContent>,
 323    input_items: &mut Vec<ResponseInputItem>,
 324) {
 325    if parts.is_empty() {
 326        return;
 327    }
 328
 329    let item = ResponseInputItem::Message(ResponseMessageItem {
 330        role: match role {
 331            Role::User => crate::Role::User,
 332            Role::Assistant => crate::Role::Assistant,
 333            Role::System => crate::Role::System,
 334        },
 335        content: parts.clone(),
 336    });
 337
 338    input_items.push(item);
 339    parts.clear();
 340}
 341
 342fn add_message_content_part(
 343    new_part: MessagePart,
 344    role: Role,
 345    messages: &mut Vec<crate::RequestMessage>,
 346) {
 347    match (role, messages.last_mut()) {
 348        (Role::User, Some(crate::RequestMessage::User { content }))
 349        | (
 350            Role::Assistant,
 351            Some(crate::RequestMessage::Assistant {
 352                content: Some(content),
 353                ..
 354            }),
 355        )
 356        | (Role::System, Some(crate::RequestMessage::System { content, .. })) => {
 357            content.push_part(new_part);
 358        }
 359        _ => {
 360            messages.push(match role {
 361                Role::User => crate::RequestMessage::User {
 362                    content: crate::MessageContent::from(vec![new_part]),
 363                },
 364                Role::Assistant => crate::RequestMessage::Assistant {
 365                    content: Some(crate::MessageContent::from(vec![new_part])),
 366                    tool_calls: Vec::new(),
 367                },
 368                Role::System => crate::RequestMessage::System {
 369                    content: crate::MessageContent::from(vec![new_part]),
 370                },
 371            });
 372        }
 373    }
 374}
 375
 376pub struct OpenAiEventMapper {
 377    tool_calls_by_index: HashMap<usize, RawToolCall>,
 378}
 379
 380impl OpenAiEventMapper {
 381    pub fn new() -> Self {
 382        Self {
 383            tool_calls_by_index: HashMap::default(),
 384        }
 385    }
 386
 387    pub fn map_stream(
 388        mut self,
 389        events: Pin<Box<dyn Send + Stream<Item = Result<ResponseStreamEvent>>>>,
 390    ) -> impl Stream<Item = Result<LanguageModelCompletionEvent, LanguageModelCompletionError>>
 391    {
 392        events.flat_map(move |event| {
 393            futures::stream::iter(match event {
 394                Ok(event) => self.map_event(event),
 395                Err(error) => vec![Err(LanguageModelCompletionError::from(anyhow!(error)))],
 396            })
 397        })
 398    }
 399
 400    pub fn map_event(
 401        &mut self,
 402        event: ResponseStreamEvent,
 403    ) -> Vec<Result<LanguageModelCompletionEvent, LanguageModelCompletionError>> {
 404        let mut events = Vec::new();
 405        if let Some(usage) = event.usage {
 406            events.push(Ok(LanguageModelCompletionEvent::UsageUpdate(TokenUsage {
 407                input_tokens: usage.prompt_tokens,
 408                output_tokens: usage.completion_tokens,
 409                cache_creation_input_tokens: 0,
 410                cache_read_input_tokens: 0,
 411            })));
 412        }
 413
 414        let Some(choice) = event.choices.first() else {
 415            return events;
 416        };
 417
 418        if let Some(delta) = choice.delta.as_ref() {
 419            if let Some(reasoning_content) = delta.reasoning_content.clone() {
 420                if !reasoning_content.is_empty() {
 421                    events.push(Ok(LanguageModelCompletionEvent::Thinking {
 422                        text: reasoning_content,
 423                        signature: None,
 424                    }));
 425                }
 426            }
 427            if let Some(content) = delta.content.clone() {
 428                if !content.is_empty() {
 429                    events.push(Ok(LanguageModelCompletionEvent::Text(content)));
 430                }
 431            }
 432
 433            if let Some(tool_calls) = delta.tool_calls.as_ref() {
 434                for tool_call in tool_calls {
 435                    let entry = self.tool_calls_by_index.entry(tool_call.index).or_default();
 436
 437                    if let Some(tool_id) = tool_call.id.clone() {
 438                        entry.id = tool_id;
 439                    }
 440
 441                    if let Some(function) = tool_call.function.as_ref() {
 442                        if let Some(name) = function.name.clone() {
 443                            entry.name = name;
 444                        }
 445
 446                        if let Some(arguments) = function.arguments.clone() {
 447                            entry.arguments.push_str(&arguments);
 448                        }
 449                    }
 450
 451                    if !entry.id.is_empty() && !entry.name.is_empty() {
 452                        if let Ok(input) = serde_json::from_str::<serde_json::Value>(
 453                            &fix_streamed_json(&entry.arguments),
 454                        ) {
 455                            events.push(Ok(LanguageModelCompletionEvent::ToolUse(
 456                                LanguageModelToolUse {
 457                                    id: entry.id.clone().into(),
 458                                    name: entry.name.as_str().into(),
 459                                    is_input_complete: false,
 460                                    input,
 461                                    raw_input: entry.arguments.clone(),
 462                                    thought_signature: None,
 463                                },
 464                            )));
 465                        }
 466                    }
 467                }
 468            }
 469        }
 470
 471        match choice.finish_reason.as_deref() {
 472            Some("stop") => {
 473                events.push(Ok(LanguageModelCompletionEvent::Stop(StopReason::EndTurn)));
 474            }
 475            Some("tool_calls") => {
 476                events.extend(self.tool_calls_by_index.drain().map(|(_, tool_call)| {
 477                    match parse_tool_arguments(&tool_call.arguments) {
 478                        Ok(input) => Ok(LanguageModelCompletionEvent::ToolUse(
 479                            LanguageModelToolUse {
 480                                id: tool_call.id.clone().into(),
 481                                name: tool_call.name.as_str().into(),
 482                                is_input_complete: true,
 483                                input,
 484                                raw_input: tool_call.arguments.clone(),
 485                                thought_signature: None,
 486                            },
 487                        )),
 488                        Err(error) => Ok(LanguageModelCompletionEvent::ToolUseJsonParseError {
 489                            id: tool_call.id.into(),
 490                            tool_name: tool_call.name.into(),
 491                            raw_input: tool_call.arguments.clone().into(),
 492                            json_parse_error: error.to_string(),
 493                        }),
 494                    }
 495                }));
 496
 497                events.push(Ok(LanguageModelCompletionEvent::Stop(StopReason::ToolUse)));
 498            }
 499            Some(stop_reason) => {
 500                log::error!("Unexpected OpenAI stop_reason: {stop_reason:?}",);
 501                events.push(Ok(LanguageModelCompletionEvent::Stop(StopReason::EndTurn)));
 502            }
 503            None => {}
 504        }
 505
 506        events
 507    }
 508}
 509
 510#[derive(Default)]
 511struct RawToolCall {
 512    id: String,
 513    name: String,
 514    arguments: String,
 515}
 516
 517pub struct OpenAiResponseEventMapper {
 518    function_calls_by_item: HashMap<String, PendingResponseFunctionCall>,
 519    pending_stop_reason: Option<StopReason>,
 520}
 521
 522#[derive(Default)]
 523struct PendingResponseFunctionCall {
 524    call_id: String,
 525    name: Arc<str>,
 526    arguments: String,
 527}
 528
 529impl OpenAiResponseEventMapper {
 530    pub fn new() -> Self {
 531        Self {
 532            function_calls_by_item: HashMap::default(),
 533            pending_stop_reason: None,
 534        }
 535    }
 536
 537    pub fn map_stream(
 538        mut self,
 539        events: Pin<Box<dyn Send + Stream<Item = Result<ResponsesStreamEvent>>>>,
 540    ) -> impl Stream<Item = Result<LanguageModelCompletionEvent, LanguageModelCompletionError>>
 541    {
 542        events.flat_map(move |event| {
 543            futures::stream::iter(match event {
 544                Ok(event) => self.map_event(event),
 545                Err(error) => vec![Err(LanguageModelCompletionError::from(anyhow!(error)))],
 546            })
 547        })
 548    }
 549
 550    pub fn map_event(
 551        &mut self,
 552        event: ResponsesStreamEvent,
 553    ) -> Vec<Result<LanguageModelCompletionEvent, LanguageModelCompletionError>> {
 554        match event {
 555            ResponsesStreamEvent::OutputItemAdded { item, .. } => {
 556                let mut events = Vec::new();
 557
 558                match &item {
 559                    ResponseOutputItem::Message(message) => {
 560                        if let Some(id) = &message.id {
 561                            events.push(Ok(LanguageModelCompletionEvent::StartMessage {
 562                                message_id: id.clone(),
 563                            }));
 564                        }
 565                    }
 566                    ResponseOutputItem::FunctionCall(function_call) => {
 567                        if let Some(item_id) = function_call.id.clone() {
 568                            let call_id = function_call
 569                                .call_id
 570                                .clone()
 571                                .or_else(|| function_call.id.clone())
 572                                .unwrap_or_else(|| item_id.clone());
 573                            let entry = PendingResponseFunctionCall {
 574                                call_id,
 575                                name: Arc::<str>::from(
 576                                    function_call.name.clone().unwrap_or_default(),
 577                                ),
 578                                arguments: function_call.arguments.clone(),
 579                            };
 580                            self.function_calls_by_item.insert(item_id, entry);
 581                        }
 582                    }
 583                    ResponseOutputItem::Reasoning(_) | ResponseOutputItem::Unknown => {}
 584                }
 585                events
 586            }
 587            ResponsesStreamEvent::ReasoningSummaryTextDelta { delta, .. } => {
 588                if delta.is_empty() {
 589                    Vec::new()
 590                } else {
 591                    vec![Ok(LanguageModelCompletionEvent::Thinking {
 592                        text: delta,
 593                        signature: None,
 594                    })]
 595                }
 596            }
 597            ResponsesStreamEvent::OutputTextDelta { delta, .. } => {
 598                if delta.is_empty() {
 599                    Vec::new()
 600                } else {
 601                    vec![Ok(LanguageModelCompletionEvent::Text(delta))]
 602                }
 603            }
 604            ResponsesStreamEvent::FunctionCallArgumentsDelta { item_id, delta, .. } => {
 605                if let Some(entry) = self.function_calls_by_item.get_mut(&item_id) {
 606                    entry.arguments.push_str(&delta);
 607                    if let Ok(input) = serde_json::from_str::<serde_json::Value>(
 608                        &fix_streamed_json(&entry.arguments),
 609                    ) {
 610                        return vec![Ok(LanguageModelCompletionEvent::ToolUse(
 611                            LanguageModelToolUse {
 612                                id: LanguageModelToolUseId::from(entry.call_id.clone()),
 613                                name: entry.name.clone(),
 614                                is_input_complete: false,
 615                                input,
 616                                raw_input: entry.arguments.clone(),
 617                                thought_signature: None,
 618                            },
 619                        ))];
 620                    }
 621                }
 622                Vec::new()
 623            }
 624            ResponsesStreamEvent::FunctionCallArgumentsDone {
 625                item_id, arguments, ..
 626            } => {
 627                if let Some(mut entry) = self.function_calls_by_item.remove(&item_id) {
 628                    if !arguments.is_empty() {
 629                        entry.arguments = arguments;
 630                    }
 631                    let raw_input = entry.arguments.clone();
 632                    self.pending_stop_reason = Some(StopReason::ToolUse);
 633                    match parse_tool_arguments(&entry.arguments) {
 634                        Ok(input) => {
 635                            vec![Ok(LanguageModelCompletionEvent::ToolUse(
 636                                LanguageModelToolUse {
 637                                    id: LanguageModelToolUseId::from(entry.call_id.clone()),
 638                                    name: entry.name.clone(),
 639                                    is_input_complete: true,
 640                                    input,
 641                                    raw_input,
 642                                    thought_signature: None,
 643                                },
 644                            ))]
 645                        }
 646                        Err(error) => {
 647                            vec![Ok(LanguageModelCompletionEvent::ToolUseJsonParseError {
 648                                id: LanguageModelToolUseId::from(entry.call_id.clone()),
 649                                tool_name: entry.name.clone(),
 650                                raw_input: Arc::<str>::from(raw_input),
 651                                json_parse_error: error.to_string(),
 652                            })]
 653                        }
 654                    }
 655                } else {
 656                    Vec::new()
 657                }
 658            }
 659            ResponsesStreamEvent::Completed { response } => {
 660                self.handle_completion(response, StopReason::EndTurn)
 661            }
 662            ResponsesStreamEvent::Incomplete { response } => {
 663                let reason = response
 664                    .status_details
 665                    .as_ref()
 666                    .and_then(|details| details.reason.as_deref());
 667                let stop_reason = match reason {
 668                    Some("max_output_tokens") => StopReason::MaxTokens,
 669                    Some("content_filter") => {
 670                        self.pending_stop_reason = Some(StopReason::Refusal);
 671                        StopReason::Refusal
 672                    }
 673                    _ => self
 674                        .pending_stop_reason
 675                        .take()
 676                        .unwrap_or(StopReason::EndTurn),
 677                };
 678
 679                let mut events = Vec::new();
 680                if self.pending_stop_reason.is_none() {
 681                    events.extend(self.emit_tool_calls_from_output(&response.output));
 682                }
 683                if let Some(usage) = response.usage.as_ref() {
 684                    events.push(Ok(LanguageModelCompletionEvent::UsageUpdate(
 685                        token_usage_from_response_usage(usage),
 686                    )));
 687                }
 688                events.push(Ok(LanguageModelCompletionEvent::Stop(stop_reason)));
 689                events
 690            }
 691            ResponsesStreamEvent::Failed { response } => {
 692                let message = response
 693                    .status_details
 694                    .and_then(|details| details.error)
 695                    .map(|error| error.to_string())
 696                    .unwrap_or_else(|| "response failed".to_string());
 697                vec![Err(LanguageModelCompletionError::Other(anyhow!(message)))]
 698            }
 699            ResponsesStreamEvent::Error { error }
 700            | ResponsesStreamEvent::GenericError { error } => {
 701                vec![Err(LanguageModelCompletionError::Other(anyhow!(
 702                    error.message
 703                )))]
 704            }
 705            ResponsesStreamEvent::ReasoningSummaryPartAdded { summary_index, .. } => {
 706                if summary_index > 0 {
 707                    vec![Ok(LanguageModelCompletionEvent::Thinking {
 708                        text: "\n\n".to_string(),
 709                        signature: None,
 710                    })]
 711                } else {
 712                    Vec::new()
 713                }
 714            }
 715            ResponsesStreamEvent::OutputTextDone { .. }
 716            | ResponsesStreamEvent::OutputItemDone { .. }
 717            | ResponsesStreamEvent::ContentPartAdded { .. }
 718            | ResponsesStreamEvent::ContentPartDone { .. }
 719            | ResponsesStreamEvent::ReasoningSummaryTextDone { .. }
 720            | ResponsesStreamEvent::ReasoningSummaryPartDone { .. }
 721            | ResponsesStreamEvent::Created { .. }
 722            | ResponsesStreamEvent::InProgress { .. }
 723            | ResponsesStreamEvent::Unknown => Vec::new(),
 724        }
 725    }
 726
 727    fn handle_completion(
 728        &mut self,
 729        response: ResponsesSummary,
 730        default_reason: StopReason,
 731    ) -> Vec<Result<LanguageModelCompletionEvent, LanguageModelCompletionError>> {
 732        let mut events = Vec::new();
 733
 734        if self.pending_stop_reason.is_none() {
 735            events.extend(self.emit_tool_calls_from_output(&response.output));
 736        }
 737
 738        if let Some(usage) = response.usage.as_ref() {
 739            events.push(Ok(LanguageModelCompletionEvent::UsageUpdate(
 740                token_usage_from_response_usage(usage),
 741            )));
 742        }
 743
 744        let stop_reason = self.pending_stop_reason.take().unwrap_or(default_reason);
 745        events.push(Ok(LanguageModelCompletionEvent::Stop(stop_reason)));
 746        events
 747    }
 748
 749    fn emit_tool_calls_from_output(
 750        &mut self,
 751        output: &[ResponseOutputItem],
 752    ) -> Vec<Result<LanguageModelCompletionEvent, LanguageModelCompletionError>> {
 753        let mut events = Vec::new();
 754        for item in output {
 755            if let ResponseOutputItem::FunctionCall(function_call) = item {
 756                let Some(call_id) = function_call
 757                    .call_id
 758                    .clone()
 759                    .or_else(|| function_call.id.clone())
 760                else {
 761                    log::error!(
 762                        "Function call item missing both call_id and id: {:?}",
 763                        function_call
 764                    );
 765                    continue;
 766                };
 767                let name: Arc<str> = Arc::from(function_call.name.clone().unwrap_or_default());
 768                let arguments = &function_call.arguments;
 769                self.pending_stop_reason = Some(StopReason::ToolUse);
 770                match parse_tool_arguments(arguments) {
 771                    Ok(input) => {
 772                        events.push(Ok(LanguageModelCompletionEvent::ToolUse(
 773                            LanguageModelToolUse {
 774                                id: LanguageModelToolUseId::from(call_id.clone()),
 775                                name: name.clone(),
 776                                is_input_complete: true,
 777                                input,
 778                                raw_input: arguments.clone(),
 779                                thought_signature: None,
 780                            },
 781                        )));
 782                    }
 783                    Err(error) => {
 784                        events.push(Ok(LanguageModelCompletionEvent::ToolUseJsonParseError {
 785                            id: LanguageModelToolUseId::from(call_id.clone()),
 786                            tool_name: name.clone(),
 787                            raw_input: Arc::<str>::from(arguments.clone()),
 788                            json_parse_error: error.to_string(),
 789                        }));
 790                    }
 791                }
 792            }
 793        }
 794        events
 795    }
 796}
 797
 798fn token_usage_from_response_usage(usage: &ResponsesUsage) -> TokenUsage {
 799    TokenUsage {
 800        input_tokens: usage.input_tokens.unwrap_or_default(),
 801        output_tokens: usage.output_tokens.unwrap_or_default(),
 802        cache_creation_input_tokens: 0,
 803        cache_read_input_tokens: 0,
 804    }
 805}
 806
 807pub fn collect_tiktoken_messages(
 808    request: LanguageModelRequest,
 809) -> Vec<tiktoken_rs::ChatCompletionRequestMessage> {
 810    request
 811        .messages
 812        .into_iter()
 813        .map(|message| tiktoken_rs::ChatCompletionRequestMessage {
 814            role: match message.role {
 815                Role::User => "user".into(),
 816                Role::Assistant => "assistant".into(),
 817                Role::System => "system".into(),
 818            },
 819            content: Some(message.string_contents()),
 820            name: None,
 821            function_call: None,
 822        })
 823        .collect::<Vec<_>>()
 824}
 825
 826/// Count tokens for an OpenAI model. This is synchronous; callers should spawn
 827/// it on a background thread if needed.
 828pub fn count_open_ai_tokens(request: LanguageModelRequest, model: Model) -> Result<u64> {
 829    let messages = collect_tiktoken_messages(request);
 830    match model {
 831        Model::Custom { max_tokens, .. } => {
 832            let model = if max_tokens >= 100_000 {
 833                // If the max tokens is 100k or more, it likely uses the o200k_base tokenizer
 834                "gpt-4o"
 835            } else {
 836                // Otherwise fallback to gpt-4, since only cl100k_base and o200k_base are
 837                // supported with this tiktoken method
 838                "gpt-4"
 839            };
 840            tiktoken_rs::num_tokens_from_messages(model, &messages)
 841        }
 842        // Currently supported by tiktoken_rs
 843        // Sometimes tiktoken-rs is behind on model support. If that is the case, make a new branch
 844        // arm with an override. We enumerate all supported models here so that we can check if new
 845        // models are supported yet or not.
 846        Model::ThreePointFiveTurbo
 847        | Model::Four
 848        | Model::FourTurbo
 849        | Model::FourOmniMini
 850        | Model::FourPointOneNano
 851        | Model::O1
 852        | Model::O3
 853        | Model::O3Mini
 854        | Model::O4Mini
 855        | Model::Five
 856        | Model::FiveCodex
 857        | Model::FiveMini
 858        | Model::FiveNano => tiktoken_rs::num_tokens_from_messages(model.id(), &messages),
 859        // GPT-5.1, 5.2, 5.2-codex, 5.3-codex, 5.4, and 5.4-pro don't have dedicated tiktoken support; use gpt-5 tokenizer
 860        Model::FivePointOne
 861        | Model::FivePointTwo
 862        | Model::FivePointTwoCodex
 863        | Model::FivePointThreeCodex
 864        | Model::FivePointFour
 865        | Model::FivePointFourPro => tiktoken_rs::num_tokens_from_messages("gpt-5", &messages),
 866    }
 867    .map(|tokens| tokens as u64)
 868}
 869
 870#[cfg(test)]
 871mod tests {
 872    use crate::responses::{
 873        ReasoningSummaryPart, ResponseFunctionToolCall, ResponseOutputItem, ResponseOutputMessage,
 874        ResponseReasoningItem, ResponseStatusDetails, ResponseSummary, ResponseUsage,
 875        StreamEvent as ResponsesStreamEvent,
 876    };
 877    use futures::{StreamExt, executor::block_on};
 878    use language_model_core::{
 879        LanguageModelImage, LanguageModelRequestMessage, LanguageModelRequestTool,
 880        LanguageModelToolResult, LanguageModelToolResultContent, LanguageModelToolUse,
 881        LanguageModelToolUseId, SharedString,
 882    };
 883    use pretty_assertions::assert_eq;
 884    use serde_json::json;
 885
 886    use super::*;
 887
 888    fn map_response_events(events: Vec<ResponsesStreamEvent>) -> Vec<LanguageModelCompletionEvent> {
 889        block_on(async {
 890            OpenAiResponseEventMapper::new()
 891                .map_stream(Box::pin(futures::stream::iter(events.into_iter().map(Ok))))
 892                .collect::<Vec<_>>()
 893                .await
 894                .into_iter()
 895                .map(Result::unwrap)
 896                .collect()
 897        })
 898    }
 899
 900    fn response_item_message(id: &str) -> ResponseOutputItem {
 901        ResponseOutputItem::Message(ResponseOutputMessage {
 902            id: Some(id.to_string()),
 903            role: Some("assistant".to_string()),
 904            status: Some("in_progress".to_string()),
 905            content: vec![],
 906        })
 907    }
 908
 909    fn response_item_function_call(id: &str, args: Option<&str>) -> ResponseOutputItem {
 910        ResponseOutputItem::FunctionCall(ResponseFunctionToolCall {
 911            id: Some(id.to_string()),
 912            status: Some("in_progress".to_string()),
 913            name: Some("get_weather".to_string()),
 914            call_id: Some("call_123".to_string()),
 915            arguments: args.map(|s| s.to_string()).unwrap_or_default(),
 916        })
 917    }
 918
 919    #[test]
 920    fn tiktoken_rs_support() {
 921        let request = LanguageModelRequest {
 922            thread_id: None,
 923            prompt_id: None,
 924            intent: None,
 925            messages: vec![LanguageModelRequestMessage {
 926                role: Role::User,
 927                content: vec![MessageContent::Text("message".into())],
 928                cache: false,
 929                reasoning_details: None,
 930            }],
 931            tools: vec![],
 932            tool_choice: None,
 933            stop: vec![],
 934            temperature: None,
 935            thinking_allowed: true,
 936            thinking_effort: None,
 937            speed: None,
 938        };
 939
 940        // Validate that all models are supported by tiktoken-rs
 941        for model in <Model as strum::IntoEnumIterator>::iter() {
 942            let count = count_open_ai_tokens(request.clone(), model).unwrap();
 943            assert!(count > 0);
 944        }
 945    }
 946
 947    #[test]
 948    fn responses_stream_maps_text_and_usage() {
 949        let events = vec![
 950            ResponsesStreamEvent::OutputItemAdded {
 951                output_index: 0,
 952                sequence_number: None,
 953                item: response_item_message("msg_123"),
 954            },
 955            ResponsesStreamEvent::OutputTextDelta {
 956                item_id: "msg_123".into(),
 957                output_index: 0,
 958                content_index: Some(0),
 959                delta: "Hello".into(),
 960            },
 961            ResponsesStreamEvent::Completed {
 962                response: ResponseSummary {
 963                    usage: Some(ResponseUsage {
 964                        input_tokens: Some(5),
 965                        output_tokens: Some(3),
 966                        total_tokens: Some(8),
 967                    }),
 968                    ..Default::default()
 969                },
 970            },
 971        ];
 972
 973        let mapped = map_response_events(events);
 974        assert!(matches!(
 975            mapped[0],
 976            LanguageModelCompletionEvent::StartMessage { ref message_id } if message_id == "msg_123"
 977        ));
 978        assert!(matches!(
 979            mapped[1],
 980            LanguageModelCompletionEvent::Text(ref text) if text == "Hello"
 981        ));
 982        assert!(matches!(
 983            mapped[2],
 984            LanguageModelCompletionEvent::UsageUpdate(TokenUsage {
 985                input_tokens: 5,
 986                output_tokens: 3,
 987                ..
 988            })
 989        ));
 990        assert!(matches!(
 991            mapped[3],
 992            LanguageModelCompletionEvent::Stop(StopReason::EndTurn)
 993        ));
 994    }
 995
 996    #[test]
 997    fn into_open_ai_response_builds_complete_payload() {
 998        let tool_call_id = LanguageModelToolUseId::from("call-42");
 999        let tool_input = json!({ "city": "Boston" });
1000        let tool_arguments = serde_json::to_string(&tool_input).unwrap();
1001        let tool_use = LanguageModelToolUse {
1002            id: tool_call_id.clone(),
1003            name: Arc::from("get_weather"),
1004            raw_input: tool_arguments.clone(),
1005            input: tool_input,
1006            is_input_complete: true,
1007            thought_signature: None,
1008        };
1009        let tool_result = LanguageModelToolResult {
1010            tool_use_id: tool_call_id,
1011            tool_name: Arc::from("get_weather"),
1012            is_error: false,
1013            content: LanguageModelToolResultContent::Text(Arc::from("Sunny")),
1014            output: Some(json!({ "forecast": "Sunny" })),
1015        };
1016        let user_image = LanguageModelImage {
1017            source: SharedString::from("aGVsbG8="),
1018            size: None,
1019        };
1020        let expected_image_url = user_image.to_base64_url();
1021
1022        let request = LanguageModelRequest {
1023            thread_id: Some("thread-123".into()),
1024            prompt_id: None,
1025            intent: None,
1026            messages: vec![
1027                LanguageModelRequestMessage {
1028                    role: Role::System,
1029                    content: vec![MessageContent::Text("System context".into())],
1030                    cache: false,
1031                    reasoning_details: None,
1032                },
1033                LanguageModelRequestMessage {
1034                    role: Role::User,
1035                    content: vec![
1036                        MessageContent::Text("Please check the weather.".into()),
1037                        MessageContent::Image(user_image),
1038                    ],
1039                    cache: false,
1040                    reasoning_details: None,
1041                },
1042                LanguageModelRequestMessage {
1043                    role: Role::Assistant,
1044                    content: vec![
1045                        MessageContent::Text("Looking that up.".into()),
1046                        MessageContent::ToolUse(tool_use),
1047                    ],
1048                    cache: false,
1049                    reasoning_details: None,
1050                },
1051                LanguageModelRequestMessage {
1052                    role: Role::Assistant,
1053                    content: vec![MessageContent::ToolResult(tool_result)],
1054                    cache: false,
1055                    reasoning_details: None,
1056                },
1057            ],
1058            tools: vec![LanguageModelRequestTool {
1059                name: "get_weather".into(),
1060                description: "Fetches the weather".into(),
1061                input_schema: json!({ "type": "object" }),
1062                use_input_streaming: false,
1063            }],
1064            tool_choice: Some(LanguageModelToolChoice::Any),
1065            stop: vec!["<STOP>".into()],
1066            temperature: None,
1067            thinking_allowed: false,
1068            thinking_effort: None,
1069            speed: None,
1070        };
1071
1072        let response = into_open_ai_response(
1073            request,
1074            "custom-model",
1075            true,
1076            true,
1077            Some(2048),
1078            Some(ReasoningEffort::Low),
1079        );
1080
1081        let serialized = serde_json::to_value(&response).unwrap();
1082        let expected = json!({
1083            "model": "custom-model",
1084            "input": [
1085                {
1086                    "type": "message",
1087                    "role": "system",
1088                    "content": [
1089                        { "type": "input_text", "text": "System context" }
1090                    ]
1091                },
1092                {
1093                    "type": "message",
1094                    "role": "user",
1095                    "content": [
1096                        { "type": "input_text", "text": "Please check the weather." },
1097                        { "type": "input_image", "image_url": expected_image_url }
1098                    ]
1099                },
1100                {
1101                    "type": "message",
1102                    "role": "assistant",
1103                    "content": [
1104                        { "type": "output_text", "text": "Looking that up.", "annotations": [] }
1105                    ]
1106                },
1107                {
1108                    "type": "function_call",
1109                    "call_id": "call-42",
1110                    "name": "get_weather",
1111                    "arguments": tool_arguments
1112                },
1113                {
1114                    "type": "function_call_output",
1115                    "call_id": "call-42",
1116                    "output": "Sunny"
1117                }
1118            ],
1119            "stream": true,
1120            "max_output_tokens": 2048,
1121            "parallel_tool_calls": true,
1122            "tool_choice": "required",
1123            "tools": [
1124                {
1125                    "type": "function",
1126                    "name": "get_weather",
1127                    "description": "Fetches the weather",
1128                    "parameters": { "type": "object" }
1129                }
1130            ],
1131            "prompt_cache_key": "thread-123",
1132            "reasoning": { "effort": "low", "summary": "auto" }
1133        });
1134
1135        assert_eq!(serialized, expected);
1136    }
1137
1138    #[test]
1139    fn responses_stream_maps_tool_calls() {
1140        let events = vec![
1141            ResponsesStreamEvent::OutputItemAdded {
1142                output_index: 0,
1143                sequence_number: None,
1144                item: response_item_function_call("item_fn", Some("{\"city\":\"Bos")),
1145            },
1146            ResponsesStreamEvent::FunctionCallArgumentsDelta {
1147                item_id: "item_fn".into(),
1148                output_index: 0,
1149                delta: "ton\"}".into(),
1150                sequence_number: None,
1151            },
1152            ResponsesStreamEvent::FunctionCallArgumentsDone {
1153                item_id: "item_fn".into(),
1154                output_index: 0,
1155                arguments: "{\"city\":\"Boston\"}".into(),
1156                sequence_number: None,
1157            },
1158            ResponsesStreamEvent::Completed {
1159                response: ResponseSummary::default(),
1160            },
1161        ];
1162
1163        let mapped = map_response_events(events);
1164        assert_eq!(mapped.len(), 3);
1165        assert!(matches!(
1166            mapped[0],
1167            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse {
1168                is_input_complete: false,
1169                ..
1170            })
1171        ));
1172        assert!(matches!(
1173            mapped[1],
1174            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse {
1175                ref id,
1176                ref name,
1177                ref raw_input,
1178                is_input_complete: true,
1179                ..
1180            }) if id.to_string() == "call_123"
1181                && name.as_ref() == "get_weather"
1182                && raw_input == "{\"city\":\"Boston\"}"
1183        ));
1184        assert!(matches!(
1185            mapped[2],
1186            LanguageModelCompletionEvent::Stop(StopReason::ToolUse)
1187        ));
1188    }
1189
1190    #[test]
1191    fn responses_stream_uses_max_tokens_stop_reason() {
1192        let events = vec![ResponsesStreamEvent::Incomplete {
1193            response: ResponseSummary {
1194                status_details: Some(ResponseStatusDetails {
1195                    reason: Some("max_output_tokens".into()),
1196                    r#type: Some("incomplete".into()),
1197                    error: None,
1198                }),
1199                usage: Some(ResponseUsage {
1200                    input_tokens: Some(10),
1201                    output_tokens: Some(20),
1202                    total_tokens: Some(30),
1203                }),
1204                ..Default::default()
1205            },
1206        }];
1207
1208        let mapped = map_response_events(events);
1209        assert!(matches!(
1210            mapped[0],
1211            LanguageModelCompletionEvent::UsageUpdate(TokenUsage {
1212                input_tokens: 10,
1213                output_tokens: 20,
1214                ..
1215            })
1216        ));
1217        assert!(matches!(
1218            mapped[1],
1219            LanguageModelCompletionEvent::Stop(StopReason::MaxTokens)
1220        ));
1221    }
1222
1223    #[test]
1224    fn responses_stream_handles_multiple_tool_calls() {
1225        let events = vec![
1226            ResponsesStreamEvent::OutputItemAdded {
1227                output_index: 0,
1228                sequence_number: None,
1229                item: response_item_function_call("item_fn1", Some("{\"city\":\"NYC\"}")),
1230            },
1231            ResponsesStreamEvent::FunctionCallArgumentsDone {
1232                item_id: "item_fn1".into(),
1233                output_index: 0,
1234                arguments: "{\"city\":\"NYC\"}".into(),
1235                sequence_number: None,
1236            },
1237            ResponsesStreamEvent::OutputItemAdded {
1238                output_index: 1,
1239                sequence_number: None,
1240                item: response_item_function_call("item_fn2", Some("{\"city\":\"LA\"}")),
1241            },
1242            ResponsesStreamEvent::FunctionCallArgumentsDone {
1243                item_id: "item_fn2".into(),
1244                output_index: 1,
1245                arguments: "{\"city\":\"LA\"}".into(),
1246                sequence_number: None,
1247            },
1248            ResponsesStreamEvent::Completed {
1249                response: ResponseSummary::default(),
1250            },
1251        ];
1252
1253        let mapped = map_response_events(events);
1254        assert_eq!(mapped.len(), 3);
1255        assert!(matches!(
1256            mapped[0],
1257            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse { ref raw_input, .. })
1258            if raw_input == "{\"city\":\"NYC\"}"
1259        ));
1260        assert!(matches!(
1261            mapped[1],
1262            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse { ref raw_input, .. })
1263            if raw_input == "{\"city\":\"LA\"}"
1264        ));
1265        assert!(matches!(
1266            mapped[2],
1267            LanguageModelCompletionEvent::Stop(StopReason::ToolUse)
1268        ));
1269    }
1270
1271    #[test]
1272    fn responses_stream_handles_mixed_text_and_tool_calls() {
1273        let events = vec![
1274            ResponsesStreamEvent::OutputItemAdded {
1275                output_index: 0,
1276                sequence_number: None,
1277                item: response_item_message("msg_123"),
1278            },
1279            ResponsesStreamEvent::OutputTextDelta {
1280                item_id: "msg_123".into(),
1281                output_index: 0,
1282                content_index: Some(0),
1283                delta: "Let me check that".into(),
1284            },
1285            ResponsesStreamEvent::OutputItemAdded {
1286                output_index: 1,
1287                sequence_number: None,
1288                item: response_item_function_call("item_fn", Some("{\"query\":\"test\"}")),
1289            },
1290            ResponsesStreamEvent::FunctionCallArgumentsDone {
1291                item_id: "item_fn".into(),
1292                output_index: 1,
1293                arguments: "{\"query\":\"test\"}".into(),
1294                sequence_number: None,
1295            },
1296            ResponsesStreamEvent::Completed {
1297                response: ResponseSummary::default(),
1298            },
1299        ];
1300
1301        let mapped = map_response_events(events);
1302        assert!(matches!(
1303            mapped[0],
1304            LanguageModelCompletionEvent::StartMessage { .. }
1305        ));
1306        assert!(
1307            matches!(mapped[1], LanguageModelCompletionEvent::Text(ref text) if text == "Let me check that")
1308        );
1309        assert!(
1310            matches!(mapped[2], LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse { ref raw_input, .. }) if raw_input == "{\"query\":\"test\"}")
1311        );
1312        assert!(matches!(
1313            mapped[3],
1314            LanguageModelCompletionEvent::Stop(StopReason::ToolUse)
1315        ));
1316    }
1317
1318    #[test]
1319    fn responses_stream_handles_json_parse_error() {
1320        let events = vec![
1321            ResponsesStreamEvent::OutputItemAdded {
1322                output_index: 0,
1323                sequence_number: None,
1324                item: response_item_function_call("item_fn", Some("{invalid json")),
1325            },
1326            ResponsesStreamEvent::FunctionCallArgumentsDone {
1327                item_id: "item_fn".into(),
1328                output_index: 0,
1329                arguments: "{invalid json".into(),
1330                sequence_number: None,
1331            },
1332            ResponsesStreamEvent::Completed {
1333                response: ResponseSummary::default(),
1334            },
1335        ];
1336
1337        let mapped = map_response_events(events);
1338        assert!(matches!(
1339            mapped[0],
1340            LanguageModelCompletionEvent::ToolUseJsonParseError { ref raw_input, .. }
1341            if raw_input.as_ref() == "{invalid json"
1342        ));
1343    }
1344
1345    #[test]
1346    fn responses_stream_handles_incomplete_function_call() {
1347        let events = vec![
1348            ResponsesStreamEvent::OutputItemAdded {
1349                output_index: 0,
1350                sequence_number: None,
1351                item: response_item_function_call("item_fn", Some("{\"city\":")),
1352            },
1353            ResponsesStreamEvent::FunctionCallArgumentsDelta {
1354                item_id: "item_fn".into(),
1355                output_index: 0,
1356                delta: "\"Boston\"".into(),
1357                sequence_number: None,
1358            },
1359            ResponsesStreamEvent::Incomplete {
1360                response: ResponseSummary {
1361                    status_details: Some(ResponseStatusDetails {
1362                        reason: Some("max_output_tokens".into()),
1363                        r#type: Some("incomplete".into()),
1364                        error: None,
1365                    }),
1366                    output: vec![response_item_function_call(
1367                        "item_fn",
1368                        Some("{\"city\":\"Boston\"}"),
1369                    )],
1370                    ..Default::default()
1371                },
1372            },
1373        ];
1374
1375        let mapped = map_response_events(events);
1376        assert_eq!(mapped.len(), 3);
1377        assert!(matches!(
1378            mapped[0],
1379            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse {
1380                is_input_complete: false,
1381                ..
1382            })
1383        ));
1384        assert!(
1385            matches!(mapped[1], LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse { ref raw_input, is_input_complete: true, .. }) if raw_input == "{\"city\":\"Boston\"}")
1386        );
1387        assert!(matches!(
1388            mapped[2],
1389            LanguageModelCompletionEvent::Stop(StopReason::MaxTokens)
1390        ));
1391    }
1392
1393    #[test]
1394    fn responses_stream_incomplete_does_not_duplicate_tool_calls() {
1395        let events = vec![
1396            ResponsesStreamEvent::OutputItemAdded {
1397                output_index: 0,
1398                sequence_number: None,
1399                item: response_item_function_call("item_fn", Some("{\"city\":\"Boston\"}")),
1400            },
1401            ResponsesStreamEvent::FunctionCallArgumentsDone {
1402                item_id: "item_fn".into(),
1403                output_index: 0,
1404                arguments: "{\"city\":\"Boston\"}".into(),
1405                sequence_number: None,
1406            },
1407            ResponsesStreamEvent::Incomplete {
1408                response: ResponseSummary {
1409                    status_details: Some(ResponseStatusDetails {
1410                        reason: Some("max_output_tokens".into()),
1411                        r#type: Some("incomplete".into()),
1412                        error: None,
1413                    }),
1414                    output: vec![response_item_function_call(
1415                        "item_fn",
1416                        Some("{\"city\":\"Boston\"}"),
1417                    )],
1418                    ..Default::default()
1419                },
1420            },
1421        ];
1422
1423        let mapped = map_response_events(events);
1424        assert_eq!(mapped.len(), 2);
1425        assert!(
1426            matches!(mapped[0], LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse { ref raw_input, .. }) if raw_input == "{\"city\":\"Boston\"}")
1427        );
1428        assert!(matches!(
1429            mapped[1],
1430            LanguageModelCompletionEvent::Stop(StopReason::MaxTokens)
1431        ));
1432    }
1433
1434    #[test]
1435    fn responses_stream_handles_empty_tool_arguments() {
1436        let events = vec![
1437            ResponsesStreamEvent::OutputItemAdded {
1438                output_index: 0,
1439                sequence_number: None,
1440                item: response_item_function_call("item_fn", Some("")),
1441            },
1442            ResponsesStreamEvent::FunctionCallArgumentsDone {
1443                item_id: "item_fn".into(),
1444                output_index: 0,
1445                arguments: "".into(),
1446                sequence_number: None,
1447            },
1448            ResponsesStreamEvent::Completed {
1449                response: ResponseSummary::default(),
1450            },
1451        ];
1452
1453        let mapped = map_response_events(events);
1454        assert_eq!(mapped.len(), 2);
1455        assert!(matches!(
1456            &mapped[0],
1457            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse {
1458                id, name, raw_input, input, ..
1459            }) if id.to_string() == "call_123"
1460                && name.as_ref() == "get_weather"
1461                && raw_input == ""
1462                && input.is_object()
1463                && input.as_object().unwrap().is_empty()
1464        ));
1465        assert!(matches!(
1466            mapped[1],
1467            LanguageModelCompletionEvent::Stop(StopReason::ToolUse)
1468        ));
1469    }
1470
1471    #[test]
1472    fn responses_stream_emits_partial_tool_use_events() {
1473        let events = vec![
1474            ResponsesStreamEvent::OutputItemAdded {
1475                output_index: 0,
1476                sequence_number: None,
1477                item: ResponseOutputItem::FunctionCall(
1478                    crate::responses::ResponseFunctionToolCall {
1479                        id: Some("item_fn".to_string()),
1480                        status: Some("in_progress".to_string()),
1481                        name: Some("get_weather".to_string()),
1482                        call_id: Some("call_abc".to_string()),
1483                        arguments: String::new(),
1484                    },
1485                ),
1486            },
1487            ResponsesStreamEvent::FunctionCallArgumentsDelta {
1488                item_id: "item_fn".into(),
1489                output_index: 0,
1490                delta: "{\"city\":\"Bos".into(),
1491                sequence_number: None,
1492            },
1493            ResponsesStreamEvent::FunctionCallArgumentsDelta {
1494                item_id: "item_fn".into(),
1495                output_index: 0,
1496                delta: "ton\"}".into(),
1497                sequence_number: None,
1498            },
1499            ResponsesStreamEvent::FunctionCallArgumentsDone {
1500                item_id: "item_fn".into(),
1501                output_index: 0,
1502                arguments: "{\"city\":\"Boston\"}".into(),
1503                sequence_number: None,
1504            },
1505            ResponsesStreamEvent::Completed {
1506                response: ResponseSummary::default(),
1507            },
1508        ];
1509
1510        let mapped = map_response_events(events);
1511        assert!(mapped.len() >= 3);
1512
1513        let complete_tool_use = mapped.iter().find(|e| {
1514            matches!(
1515                e,
1516                LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse {
1517                    is_input_complete: true,
1518                    ..
1519                })
1520            )
1521        });
1522        assert!(
1523            complete_tool_use.is_some(),
1524            "should have a complete tool use event"
1525        );
1526
1527        let tool_uses: Vec<_> = mapped
1528            .iter()
1529            .filter(|e| matches!(e, LanguageModelCompletionEvent::ToolUse(_)))
1530            .collect();
1531        assert!(
1532            tool_uses.len() >= 2,
1533            "should have at least one partial and one complete event"
1534        );
1535        assert!(matches!(
1536            tool_uses.last().unwrap(),
1537            LanguageModelCompletionEvent::ToolUse(LanguageModelToolUse {
1538                is_input_complete: true,
1539                ..
1540            })
1541        ));
1542    }
1543
1544    #[test]
1545    fn responses_stream_maps_reasoning_summary_deltas() {
1546        let events = vec![
1547            ResponsesStreamEvent::OutputItemAdded {
1548                output_index: 0,
1549                sequence_number: None,
1550                item: ResponseOutputItem::Reasoning(ResponseReasoningItem {
1551                    id: Some("rs_123".into()),
1552                    summary: vec![],
1553                }),
1554            },
1555            ResponsesStreamEvent::ReasoningSummaryPartAdded {
1556                item_id: "rs_123".into(),
1557                output_index: 0,
1558                summary_index: 0,
1559            },
1560            ResponsesStreamEvent::ReasoningSummaryTextDelta {
1561                item_id: "rs_123".into(),
1562                output_index: 0,
1563                delta: "Thinking about".into(),
1564            },
1565            ResponsesStreamEvent::ReasoningSummaryTextDelta {
1566                item_id: "rs_123".into(),
1567                output_index: 0,
1568                delta: " the answer".into(),
1569            },
1570            ResponsesStreamEvent::ReasoningSummaryTextDone {
1571                item_id: "rs_123".into(),
1572                output_index: 0,
1573                text: "Thinking about the answer".into(),
1574            },
1575            ResponsesStreamEvent::ReasoningSummaryPartDone {
1576                item_id: "rs_123".into(),
1577                output_index: 0,
1578                summary_index: 0,
1579            },
1580            ResponsesStreamEvent::ReasoningSummaryPartAdded {
1581                item_id: "rs_123".into(),
1582                output_index: 0,
1583                summary_index: 1,
1584            },
1585            ResponsesStreamEvent::ReasoningSummaryTextDelta {
1586                item_id: "rs_123".into(),
1587                output_index: 0,
1588                delta: "Second part".into(),
1589            },
1590            ResponsesStreamEvent::ReasoningSummaryTextDone {
1591                item_id: "rs_123".into(),
1592                output_index: 0,
1593                text: "Second part".into(),
1594            },
1595            ResponsesStreamEvent::ReasoningSummaryPartDone {
1596                item_id: "rs_123".into(),
1597                output_index: 0,
1598                summary_index: 1,
1599            },
1600            ResponsesStreamEvent::OutputItemDone {
1601                output_index: 0,
1602                sequence_number: None,
1603                item: ResponseOutputItem::Reasoning(ResponseReasoningItem {
1604                    id: Some("rs_123".into()),
1605                    summary: vec![
1606                        ReasoningSummaryPart::SummaryText {
1607                            text: "Thinking about the answer".into(),
1608                        },
1609                        ReasoningSummaryPart::SummaryText {
1610                            text: "Second part".into(),
1611                        },
1612                    ],
1613                }),
1614            },
1615            ResponsesStreamEvent::OutputItemAdded {
1616                output_index: 1,
1617                sequence_number: None,
1618                item: response_item_message("msg_456"),
1619            },
1620            ResponsesStreamEvent::OutputTextDelta {
1621                item_id: "msg_456".into(),
1622                output_index: 1,
1623                content_index: Some(0),
1624                delta: "The answer is 42".into(),
1625            },
1626            ResponsesStreamEvent::Completed {
1627                response: ResponseSummary::default(),
1628            },
1629        ];
1630
1631        let mapped = map_response_events(events);
1632
1633        let thinking_events: Vec<_> = mapped
1634            .iter()
1635            .filter(|e| matches!(e, LanguageModelCompletionEvent::Thinking { .. }))
1636            .collect();
1637        assert_eq!(
1638            thinking_events.len(),
1639            4,
1640            "expected 4 thinking events, got {:?}",
1641            thinking_events
1642        );
1643        assert!(
1644            matches!(&thinking_events[0], LanguageModelCompletionEvent::Thinking { text, .. } if text == "Thinking about")
1645        );
1646        assert!(
1647            matches!(&thinking_events[1], LanguageModelCompletionEvent::Thinking { text, .. } if text == " the answer")
1648        );
1649        assert!(
1650            matches!(&thinking_events[2], LanguageModelCompletionEvent::Thinking { text, .. } if text == "\n\n"),
1651            "expected separator between summary parts"
1652        );
1653        assert!(
1654            matches!(&thinking_events[3], LanguageModelCompletionEvent::Thinking { text, .. } if text == "Second part")
1655        );
1656
1657        assert!(mapped.iter().any(
1658            |e| matches!(e, LanguageModelCompletionEvent::Text(t) if t == "The answer is 42")
1659        ));
1660    }
1661
1662    #[test]
1663    fn responses_stream_maps_reasoning_from_done_only() {
1664        let events = vec![
1665            ResponsesStreamEvent::OutputItemAdded {
1666                output_index: 0,
1667                sequence_number: None,
1668                item: ResponseOutputItem::Reasoning(ResponseReasoningItem {
1669                    id: Some("rs_789".into()),
1670                    summary: vec![],
1671                }),
1672            },
1673            ResponsesStreamEvent::OutputItemDone {
1674                output_index: 0,
1675                sequence_number: None,
1676                item: ResponseOutputItem::Reasoning(ResponseReasoningItem {
1677                    id: Some("rs_789".into()),
1678                    summary: vec![ReasoningSummaryPart::SummaryText {
1679                        text: "Summary without deltas".into(),
1680                    }],
1681                }),
1682            },
1683            ResponsesStreamEvent::Completed {
1684                response: ResponseSummary::default(),
1685            },
1686        ];
1687
1688        let mapped = map_response_events(events);
1689        assert!(
1690            !mapped
1691                .iter()
1692                .any(|e| matches!(e, LanguageModelCompletionEvent::Thinking { .. })),
1693            "OutputItemDone reasoning should not produce Thinking events"
1694        );
1695    }
1696}