From 85294063fcf0d017d960b0e72c08554f1e6731bb Mon Sep 17 00:00:00 2001 From: Mikayla Maki Date: Fri, 6 Feb 2026 20:21:58 -0800 Subject: [PATCH] Strip broken thinking blocks from Anthropic requests (#48548) TODO: - [x] Review code - [x] Decide whether to keep ignored API tests Release Notes: - Fixed a bug where cancelling a thread mid-thought would cause further anthropic requests to fail - Fixed a bug where the model configured on a thread would not be persisted alongside that thread --- Cargo.lock | 4 + crates/agent/src/agent.rs | 362 +++++++++++++++++- crates/agent/src/thread.rs | 13 +- crates/anthropic/Cargo.toml | 6 + crates/language_model/src/fake_provider.rs | 52 ++- .../language_models/src/provider/anthropic.rs | 349 +++++++++-------- .../language_models/src/provider/bedrock.rs | 10 + 7 files changed, 623 insertions(+), 173 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7fdc32d03091956aa49bed4041dd1b87cf46792..19bd981e39253e977d5fd7676f376f245a244183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -684,13 +684,17 @@ dependencies = [ "anyhow", "chrono", "futures 0.3.31", + "gpui", + "gpui_tokio", "http_client", + "reqwest_client", "schemars", "serde", "serde_json", "settings", "strum 0.27.2", "thiserror 2.0.17", + "tokio", ] [[package]] diff --git a/crates/agent/src/agent.rs b/crates/agent/src/agent.rs index 5df384bbfd9100e166f245ca011619e0200bd727..4b66c4eba62e9ecec7500c934eee3ae776f58cef 100644 --- a/crates/agent/src/agent.rs +++ b/crates/agent/src/agent.rs @@ -1161,6 +1161,7 @@ impl acp_thread::AgentModelSelector for NativeAgentModelSelector { thread.update(cx, |thread, cx| { thread.set_model(model.clone(), cx); thread.set_thinking_effort(effort.clone(), cx); + thread.set_thinking_enabled(model.supports_thinking(), cx); }); update_settings_file( @@ -1169,16 +1170,7 @@ impl acp_thread::AgentModelSelector for NativeAgentModelSelector { move |settings, cx| { let provider = model.provider_id().0.to_string(); let model = model.id().0.to_string(); - let enable_thinking = settings - .agent - .as_ref() - .and_then(|agent| { - agent - .default_model - .as_ref() - .map(|default_model| default_model.enable_thinking) - }) - .unwrap_or_else(|| thread.read(cx).thinking_enabled()); + let enable_thinking = thread.read(cx).thinking_enabled(); settings .agent .get_or_insert_default() @@ -1644,7 +1636,8 @@ mod internal_tests { use fs::FakeFs; use gpui::TestAppContext; use indoc::formatdoc; - use language_model::fake_provider::FakeLanguageModel; + use language_model::fake_provider::{FakeLanguageModel, FakeLanguageModelProvider}; + use language_model::{LanguageModelProviderId, LanguageModelProviderName}; use serde_json::json; use settings::SettingsStore; use util::{path, rel_path::rel_path}; @@ -1853,6 +1846,353 @@ mod internal_tests { settings_json["agent"]["default_model"]["provider"], json!("fake") ); + + // Register a thinking model and select it. + cx.update(|cx| { + let thinking_model = Arc::new(FakeLanguageModel::with_id_and_thinking( + "fake-corp", + "fake-thinking", + "Fake Thinking", + true, + )); + let thinking_provider = Arc::new( + FakeLanguageModelProvider::new( + LanguageModelProviderId::from("fake-corp".to_string()), + LanguageModelProviderName::from("Fake Corp".to_string()), + ) + .with_models(vec![thinking_model]), + ); + LanguageModelRegistry::global(cx).update(cx, |registry, cx| { + registry.register_provider(thinking_provider, cx); + }); + }); + agent.update(cx, |agent, cx| agent.models.refresh_list(cx)); + + let selector = connection.model_selector(&session_id).unwrap(); + cx.update(|cx| selector.select_model(acp::ModelId::new("fake-corp/fake-thinking"), cx)) + .await + .unwrap(); + cx.run_until_parked(); + + // Verify enable_thinking was written to settings as true. + let settings_content = fs.load(paths::settings_file()).await.unwrap(); + let settings_json: serde_json::Value = serde_json::from_str(&settings_content).unwrap(); + assert_eq!( + settings_json["agent"]["default_model"]["enable_thinking"], + json!(true), + "selecting a thinking model should persist enable_thinking: true to settings" + ); + } + + #[gpui::test] + async fn test_select_model_updates_thinking_enabled(cx: &mut TestAppContext) { + init_test(cx); + let fs = FakeFs::new(cx.executor()); + fs.create_dir(paths::settings_file().parent().unwrap()) + .await + .unwrap(); + fs.insert_file(paths::settings_file(), b"{}".to_vec()).await; + let project = Project::test(fs.clone(), [], cx).await; + + let thread_store = cx.new(|cx| ThreadStore::new(cx)); + let agent = NativeAgent::new( + project.clone(), + thread_store, + Templates::new(), + None, + fs.clone(), + &mut cx.to_async(), + ) + .await + .unwrap(); + let connection = NativeAgentConnection(agent.clone()); + + let acp_thread = cx + .update(|cx| { + Rc::new(connection.clone()).new_thread(project.clone(), Path::new("/a"), cx) + }) + .await + .unwrap(); + let session_id = cx.update(|cx| acp_thread.read(cx).session_id().clone()); + + // Register a second provider with a thinking model. + cx.update(|cx| { + let thinking_model = Arc::new(FakeLanguageModel::with_id_and_thinking( + "fake-corp", + "fake-thinking", + "Fake Thinking", + true, + )); + let thinking_provider = Arc::new( + FakeLanguageModelProvider::new( + LanguageModelProviderId::from("fake-corp".to_string()), + LanguageModelProviderName::from("Fake Corp".to_string()), + ) + .with_models(vec![thinking_model]), + ); + LanguageModelRegistry::global(cx).update(cx, |registry, cx| { + registry.register_provider(thinking_provider, cx); + }); + }); + // Refresh the agent's model list so it picks up the new provider. + agent.update(cx, |agent, cx| agent.models.refresh_list(cx)); + + // Thread starts with thinking_enabled = false (the default). + agent.read_with(cx, |agent, _| { + let session = agent.sessions.get(&session_id).unwrap(); + session.thread.read_with(cx, |thread, _| { + assert!(!thread.thinking_enabled(), "thinking defaults to false"); + }); + }); + + // Select the thinking model via select_model. + let selector = connection.model_selector(&session_id).unwrap(); + cx.update(|cx| selector.select_model(acp::ModelId::new("fake-corp/fake-thinking"), cx)) + .await + .unwrap(); + + // select_model should have enabled thinking based on the model's supports_thinking(). + agent.read_with(cx, |agent, _| { + let session = agent.sessions.get(&session_id).unwrap(); + session.thread.read_with(cx, |thread, _| { + assert!( + thread.thinking_enabled(), + "select_model should enable thinking when model supports it" + ); + }); + }); + + // Switch back to the non-thinking model. + let selector = connection.model_selector(&session_id).unwrap(); + cx.update(|cx| selector.select_model(acp::ModelId::new("fake/fake"), cx)) + .await + .unwrap(); + + // select_model should have disabled thinking. + agent.read_with(cx, |agent, _| { + let session = agent.sessions.get(&session_id).unwrap(); + session.thread.read_with(cx, |thread, _| { + assert!( + !thread.thinking_enabled(), + "select_model should disable thinking when model does not support it" + ); + }); + }); + } + + #[gpui::test] + async fn test_loaded_thread_preserves_thinking_enabled(cx: &mut TestAppContext) { + init_test(cx); + let fs = FakeFs::new(cx.executor()); + fs.insert_tree("/", json!({ "a": {} })).await; + let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await; + let thread_store = cx.new(|cx| ThreadStore::new(cx)); + let agent = NativeAgent::new( + project.clone(), + thread_store.clone(), + Templates::new(), + None, + fs.clone(), + &mut cx.to_async(), + ) + .await + .unwrap(); + let connection = Rc::new(NativeAgentConnection(agent.clone())); + + // Register a thinking model. + let thinking_model = Arc::new(FakeLanguageModel::with_id_and_thinking( + "fake-corp", + "fake-thinking", + "Fake Thinking", + true, + )); + let thinking_provider = Arc::new( + FakeLanguageModelProvider::new( + LanguageModelProviderId::from("fake-corp".to_string()), + LanguageModelProviderName::from("Fake Corp".to_string()), + ) + .with_models(vec![thinking_model.clone()]), + ); + cx.update(|cx| { + LanguageModelRegistry::global(cx).update(cx, |registry, cx| { + registry.register_provider(thinking_provider, cx); + }); + }); + agent.update(cx, |agent, cx| agent.models.refresh_list(cx)); + + // Create a thread and select the thinking model. + let acp_thread = cx + .update(|cx| { + connection + .clone() + .new_thread(project.clone(), Path::new("/a"), cx) + }) + .await + .unwrap(); + let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone()); + + let selector = connection.model_selector(&session_id).unwrap(); + cx.update(|cx| selector.select_model(acp::ModelId::new("fake-corp/fake-thinking"), cx)) + .await + .unwrap(); + + // Verify thinking is enabled after selecting the thinking model. + let thread = agent.read_with(cx, |agent, _| { + agent.sessions.get(&session_id).unwrap().thread.clone() + }); + thread.read_with(cx, |thread, _| { + assert!( + thread.thinking_enabled(), + "thinking should be enabled after selecting thinking model" + ); + }); + + // Send a message so the thread gets persisted. + let send = acp_thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx)); + let send = cx.foreground_executor().spawn(send); + cx.run_until_parked(); + + thinking_model.send_last_completion_stream_text_chunk("Response."); + thinking_model.end_last_completion_stream(); + + send.await.unwrap(); + cx.run_until_parked(); + + // Drop the thread so it can be reloaded from disk. + cx.update(|_| { + drop(thread); + drop(acp_thread); + }); + agent.read_with(cx, |agent, _| { + assert!(agent.sessions.is_empty()); + }); + + // Reload the thread and verify thinking_enabled is still true. + let reloaded_acp_thread = agent + .update(cx, |agent, cx| agent.open_thread(session_id.clone(), cx)) + .await + .unwrap(); + let reloaded_thread = agent.read_with(cx, |agent, _| { + agent.sessions.get(&session_id).unwrap().thread.clone() + }); + reloaded_thread.read_with(cx, |thread, _| { + assert!( + thread.thinking_enabled(), + "thinking_enabled should be preserved when reloading a thread with a thinking model" + ); + }); + + drop(reloaded_acp_thread); + } + + #[gpui::test] + async fn test_loaded_thread_preserves_model(cx: &mut TestAppContext) { + init_test(cx); + let fs = FakeFs::new(cx.executor()); + fs.insert_tree("/", json!({ "a": {} })).await; + let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await; + let thread_store = cx.new(|cx| ThreadStore::new(cx)); + let agent = NativeAgent::new( + project.clone(), + thread_store.clone(), + Templates::new(), + None, + fs.clone(), + &mut cx.to_async(), + ) + .await + .unwrap(); + let connection = Rc::new(NativeAgentConnection(agent.clone())); + + // Register a model where id() != name(), like real Anthropic models + // (e.g. id="claude-sonnet-4-5-thinking-latest", name="Claude Sonnet 4.5 Thinking"). + let model = Arc::new(FakeLanguageModel::with_id_and_thinking( + "fake-corp", + "custom-model-id", + "Custom Model Display Name", + false, + )); + let provider = Arc::new( + FakeLanguageModelProvider::new( + LanguageModelProviderId::from("fake-corp".to_string()), + LanguageModelProviderName::from("Fake Corp".to_string()), + ) + .with_models(vec![model.clone()]), + ); + cx.update(|cx| { + LanguageModelRegistry::global(cx).update(cx, |registry, cx| { + registry.register_provider(provider, cx); + }); + }); + agent.update(cx, |agent, cx| agent.models.refresh_list(cx)); + + // Create a thread and select the model. + let acp_thread = cx + .update(|cx| { + connection + .clone() + .new_thread(project.clone(), Path::new("/a"), cx) + }) + .await + .unwrap(); + let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone()); + + let selector = connection.model_selector(&session_id).unwrap(); + cx.update(|cx| selector.select_model(acp::ModelId::new("fake-corp/custom-model-id"), cx)) + .await + .unwrap(); + + let thread = agent.read_with(cx, |agent, _| { + agent.sessions.get(&session_id).unwrap().thread.clone() + }); + thread.read_with(cx, |thread, _| { + assert_eq!( + thread.model().unwrap().id().0.as_ref(), + "custom-model-id", + "model should be set before persisting" + ); + }); + + // Send a message so the thread gets persisted. + let send = acp_thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx)); + let send = cx.foreground_executor().spawn(send); + cx.run_until_parked(); + + model.send_last_completion_stream_text_chunk("Response."); + model.end_last_completion_stream(); + + send.await.unwrap(); + cx.run_until_parked(); + + // Drop the thread so it can be reloaded from disk. + cx.update(|_| { + drop(thread); + drop(acp_thread); + }); + agent.read_with(cx, |agent, _| { + assert!(agent.sessions.is_empty()); + }); + + // Reload the thread and verify the model was preserved. + let reloaded_acp_thread = agent + .update(cx, |agent, cx| agent.open_thread(session_id.clone(), cx)) + .await + .unwrap(); + let reloaded_thread = agent.read_with(cx, |agent, _| { + agent.sessions.get(&session_id).unwrap().thread.clone() + }); + reloaded_thread.read_with(cx, |thread, _| { + let reloaded_model = thread + .model() + .expect("model should be present after reload"); + assert_eq!( + reloaded_model.id().0.as_ref(), + "custom-model-id", + "reloaded thread should have the same model, not fall back to the default" + ); + }); + + drop(reloaded_acp_thread); } #[gpui::test] diff --git a/crates/agent/src/thread.rs b/crates/agent/src/thread.rs index 1947553c1670b98ec4a33b2333981257dc205b10..24d661dbf038a9a12ed528c7f52e72aa72387c3e 100644 --- a/crates/agent/src/thread.rs +++ b/crates/agent/src/thread.rs @@ -1093,10 +1093,6 @@ impl Thread { let profile_id = db_thread .profile .unwrap_or_else(|| settings.default_profile.clone()); - let enable_thinking = settings - .default_model - .as_ref() - .is_some_and(|model| model.enable_thinking); let thinking_effort = settings .default_model .as_ref() @@ -1129,6 +1125,12 @@ impl Thread { watch::channel(Self::prompt_capabilities(model.as_deref())); let action_log = cx.new(|_| ActionLog::new(project.clone())); + // TODO: We should serialize the user's configured thinking parameter on `DbThread` + // rather than deriving it from the model's capability. A user may have explicitly + // toggled thinking off for a model that supports it, and we'd lose that preference here. + let enable_thinking = model + .as_deref() + .is_some_and(|model| model.supports_thinking()); Self { id, @@ -1156,7 +1158,6 @@ impl Thread { templates, model, summarization_model: None, - // TODO: Should we persist this on the `DbThread`? thinking_enabled: enable_thinking, thinking_effort, project, @@ -1183,7 +1184,7 @@ impl Thread { request_token_usage: self.request_token_usage.clone(), model: self.model.as_ref().map(|model| DbLanguageModel { provider: model.provider_id().to_string(), - model: model.name().0.to_string(), + model: model.id().0.to_string(), }), profile: Some(self.profile_id.clone()), imported: self.imported, diff --git a/crates/anthropic/Cargo.toml b/crates/anthropic/Cargo.toml index a9c7208b0caa9a2660aa723c903554205e672fe6..f344470475a7603782d3eba9a8c461a92d7b4855 100644 --- a/crates/anthropic/Cargo.toml +++ b/crates/anthropic/Cargo.toml @@ -26,3 +26,9 @@ serde_json.workspace = true settings.workspace = true strum.workspace = true thiserror.workspace = true + +[dev-dependencies] +reqwest_client.workspace = true +gpui_tokio.workspace = true +gpui.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/language_model/src/fake_provider.rs b/crates/language_model/src/fake_provider.rs index b06a475f9385012e5b88466c80fbb14e0ed744ac..ae01084a2657abdc86e7510aa49663cf98aabe70 100644 --- a/crates/language_model/src/fake_provider.rs +++ b/crates/language_model/src/fake_provider.rs @@ -19,6 +19,7 @@ use std::sync::{ pub struct FakeLanguageModelProvider { id: LanguageModelProviderId, name: LanguageModelProviderName, + models: Vec>, } impl Default for FakeLanguageModelProvider { @@ -26,6 +27,7 @@ impl Default for FakeLanguageModelProvider { Self { id: LanguageModelProviderId::from("fake".to_string()), name: LanguageModelProviderName::from("Fake".to_string()), + models: vec![Arc::new(FakeLanguageModel::default())], } } } @@ -48,15 +50,15 @@ impl LanguageModelProvider for FakeLanguageModelProvider { } fn default_model(&self, _cx: &App) -> Option> { - Some(Arc::new(FakeLanguageModel::default())) + self.models.first().cloned() } fn default_fast_model(&self, _cx: &App) -> Option> { - Some(Arc::new(FakeLanguageModel::default())) + self.models.first().cloned() } fn provided_models(&self, _: &App) -> Vec> { - vec![Arc::new(FakeLanguageModel::default())] + self.models.clone() } fn is_authenticated(&self, _: &App) -> bool { @@ -83,7 +85,16 @@ impl LanguageModelProvider for FakeLanguageModelProvider { impl FakeLanguageModelProvider { pub fn new(id: LanguageModelProviderId, name: LanguageModelProviderName) -> Self { - Self { id, name } + Self { + id, + name, + models: vec![Arc::new(FakeLanguageModel::default())], + } + } + + pub fn with_models(mut self, models: Vec>) -> Self { + self.models = models; + self } pub fn test_model(&self) -> FakeLanguageModel { @@ -100,6 +111,8 @@ pub struct ToolUseRequest { } pub struct FakeLanguageModel { + id: LanguageModelId, + name: LanguageModelName, provider_id: LanguageModelProviderId, provider_name: LanguageModelProviderName, current_completion_txs: Mutex< @@ -111,20 +124,39 @@ pub struct FakeLanguageModel { )>, >, forbid_requests: AtomicBool, + supports_thinking: AtomicBool, } impl Default for FakeLanguageModel { fn default() -> Self { Self { + id: LanguageModelId::from("fake".to_string()), + name: LanguageModelName::from("Fake".to_string()), provider_id: LanguageModelProviderId::from("fake".to_string()), provider_name: LanguageModelProviderName::from("Fake".to_string()), current_completion_txs: Mutex::new(Vec::new()), forbid_requests: AtomicBool::new(false), + supports_thinking: AtomicBool::new(false), } } } impl FakeLanguageModel { + pub fn with_id_and_thinking( + provider_id: &str, + id: &str, + name: &str, + supports_thinking: bool, + ) -> Self { + Self { + id: LanguageModelId::from(id.to_string()), + name: LanguageModelName::from(name.to_string()), + provider_id: LanguageModelProviderId::from(provider_id.to_string()), + supports_thinking: AtomicBool::new(supports_thinking), + ..Default::default() + } + } + pub fn allow_requests(&self) { self.forbid_requests.store(false, SeqCst); } @@ -133,6 +165,10 @@ impl FakeLanguageModel { self.forbid_requests.store(true, SeqCst); } + pub fn set_supports_thinking(&self, supports: bool) { + self.supports_thinking.store(supports, SeqCst); + } + pub fn pending_completions(&self) -> Vec { self.current_completion_txs .lock() @@ -215,11 +251,11 @@ impl FakeLanguageModel { impl LanguageModel for FakeLanguageModel { fn id(&self) -> LanguageModelId { - LanguageModelId::from("fake".to_string()) + self.id.clone() } fn name(&self) -> LanguageModelName { - LanguageModelName::from("Fake".to_string()) + self.name.clone() } fn provider_id(&self) -> LanguageModelProviderId { @@ -242,6 +278,10 @@ impl LanguageModel for FakeLanguageModel { false } + fn supports_thinking(&self) -> bool { + self.supports_thinking.load(SeqCst) + } + fn telemetry_id(&self) -> String { "fake".to_string() } diff --git a/crates/language_models/src/provider/anthropic.rs b/crates/language_models/src/provider/anthropic.rs index 5d4f3b9ef3f012b2d008c9e83bd6719f8d7f4542..47dec06232bb12e33ac144bb55201d825310b0fe 100644 --- a/crates/language_models/src/provider/anthropic.rs +++ b/crates/language_models/src/provider/anthropic.rs @@ -219,6 +219,82 @@ pub struct AnthropicModel { request_limiter: RateLimiter, } +fn to_anthropic_content(content: MessageContent) -> Option { + match content { + MessageContent::Text(text) => { + let text = if text.chars().last().is_some_and(|c| c.is_whitespace()) { + text.trim_end().to_string() + } else { + text + }; + if !text.is_empty() { + Some(anthropic::RequestContent::Text { + text, + cache_control: None, + }) + } else { + None + } + } + MessageContent::Thinking { + text: thinking, + signature, + } => { + if let Some(signature) = signature + && !thinking.is_empty() + { + Some(anthropic::RequestContent::Thinking { + thinking, + signature, + cache_control: None, + }) + } else { + None + } + } + MessageContent::RedactedThinking(data) => { + if !data.is_empty() { + Some(anthropic::RequestContent::RedactedThinking { data }) + } else { + None + } + } + MessageContent::Image(image) => Some(anthropic::RequestContent::Image { + source: anthropic::ImageSource { + source_type: "base64".to_string(), + media_type: "image/png".to_string(), + data: image.source.to_string(), + }, + cache_control: None, + }), + MessageContent::ToolUse(tool_use) => Some(anthropic::RequestContent::ToolUse { + id: tool_use.id.to_string(), + name: tool_use.name.to_string(), + input: tool_use.input, + cache_control: None, + }), + MessageContent::ToolResult(tool_result) => Some(anthropic::RequestContent::ToolResult { + tool_use_id: tool_result.tool_use_id.to_string(), + is_error: tool_result.is_error, + content: match tool_result.content { + LanguageModelToolResultContent::Text(text) => { + ToolResultContent::Plain(text.to_string()) + } + LanguageModelToolResultContent::Image(image) => { + ToolResultContent::Multipart(vec![ToolResultPart::Image { + source: anthropic::ImageSource { + source_type: "base64".to_string(), + media_type: "image/png".to_string(), + data: image.source.to_string(), + }, + }]) + } + }, + cache_control: None, + }), + } +} + /// Convert a LanguageModelRequest to an Anthropic CountTokensRequest. pub fn into_anthropic_count_tokens_request( request: LanguageModelRequest, @@ -238,87 +314,17 @@ pub fn into_anthropic_count_tokens_request( let anthropic_message_content: Vec = message .content .into_iter() - .filter_map(|content| match content { - MessageContent::Text(text) => { - let text = if text.chars().last().is_some_and(|c| c.is_whitespace()) { - text.trim_end().to_string() - } else { - text - }; - if !text.is_empty() { - Some(anthropic::RequestContent::Text { - text, - cache_control: None, - }) - } else { - None - } - } - MessageContent::Thinking { - text: thinking, - signature, - } => { - if !thinking.is_empty() { - Some(anthropic::RequestContent::Thinking { - thinking, - signature: signature.unwrap_or_default(), - cache_control: None, - }) - } else { - None - } - } - MessageContent::RedactedThinking(data) => { - if !data.is_empty() { - Some(anthropic::RequestContent::RedactedThinking { data }) - } else { - None - } - } - MessageContent::Image(image) => Some(anthropic::RequestContent::Image { - source: anthropic::ImageSource { - source_type: "base64".to_string(), - media_type: "image/png".to_string(), - data: image.source.to_string(), - }, - cache_control: None, - }), - MessageContent::ToolUse(tool_use) => { - Some(anthropic::RequestContent::ToolUse { - id: tool_use.id.to_string(), - name: tool_use.name.to_string(), - input: tool_use.input, - cache_control: None, - }) - } - MessageContent::ToolResult(tool_result) => { - Some(anthropic::RequestContent::ToolResult { - tool_use_id: tool_result.tool_use_id.to_string(), - is_error: tool_result.is_error, - content: match tool_result.content { - LanguageModelToolResultContent::Text(text) => { - ToolResultContent::Plain(text.to_string()) - } - LanguageModelToolResultContent::Image(image) => { - ToolResultContent::Multipart(vec![ToolResultPart::Image { - source: anthropic::ImageSource { - source_type: "base64".to_string(), - media_type: "image/png".to_string(), - data: image.source.to_string(), - }, - }]) - } - }, - cache_control: None, - }) - } - }) + .filter_map(to_anthropic_content) .collect(); let anthropic_role = match message.role { Role::User => anthropic::Role::User, Role::Assistant => anthropic::Role::Assistant, Role::System => unreachable!("System role should never occur here"), }; + if anthropic_message_content.is_empty() { + continue; + } + if let Some(last_message) = new_messages.last_mut() && last_message.role == anthropic_role { @@ -624,87 +630,17 @@ pub fn into_anthropic( let mut anthropic_message_content: Vec = message .content .into_iter() - .filter_map(|content| match content { - MessageContent::Text(text) => { - let text = if text.chars().last().is_some_and(|c| c.is_whitespace()) { - text.trim_end().to_string() - } else { - text - }; - if !text.is_empty() { - Some(anthropic::RequestContent::Text { - text, - cache_control: None, - }) - } else { - None - } - } - MessageContent::Thinking { - text: thinking, - signature, - } => { - if !thinking.is_empty() { - Some(anthropic::RequestContent::Thinking { - thinking, - signature: signature.unwrap_or_default(), - cache_control: None, - }) - } else { - None - } - } - MessageContent::RedactedThinking(data) => { - if !data.is_empty() { - Some(anthropic::RequestContent::RedactedThinking { data }) - } else { - None - } - } - MessageContent::Image(image) => Some(anthropic::RequestContent::Image { - source: anthropic::ImageSource { - source_type: "base64".to_string(), - media_type: "image/png".to_string(), - data: image.source.to_string(), - }, - cache_control: None, - }), - MessageContent::ToolUse(tool_use) => { - Some(anthropic::RequestContent::ToolUse { - id: tool_use.id.to_string(), - name: tool_use.name.to_string(), - input: tool_use.input, - cache_control: None, - }) - } - MessageContent::ToolResult(tool_result) => { - Some(anthropic::RequestContent::ToolResult { - tool_use_id: tool_result.tool_use_id.to_string(), - is_error: tool_result.is_error, - content: match tool_result.content { - LanguageModelToolResultContent::Text(text) => { - ToolResultContent::Plain(text.to_string()) - } - LanguageModelToolResultContent::Image(image) => { - ToolResultContent::Multipart(vec![ToolResultPart::Image { - source: anthropic::ImageSource { - source_type: "base64".to_string(), - media_type: "image/png".to_string(), - data: image.source.to_string(), - }, - }]) - } - }, - cache_control: None, - }) - } - }) + .filter_map(to_anthropic_content) .collect(); let anthropic_role = match message.role { Role::User => anthropic::Role::User, Role::Assistant => anthropic::Role::Assistant, Role::System => unreachable!("System role should never occur here"), }; + if anthropic_message_content.is_empty() { + continue; + } + if let Some(last_message) = new_messages.last_mut() && last_message.role == anthropic_role { @@ -1208,4 +1144,117 @@ mod tests { } )); } + + fn request_with_assistant_content( + assistant_content: Vec, + ) -> anthropic::Request { + let mut request = LanguageModelRequest { + messages: vec![LanguageModelRequestMessage { + role: Role::User, + content: vec![MessageContent::Text("Hello".to_string())], + cache: false, + reasoning_details: None, + }], + thinking_effort: None, + thread_id: None, + prompt_id: None, + intent: None, + stop: vec![], + temperature: None, + tools: vec![], + tool_choice: None, + thinking_allowed: true, + }; + request.messages.push(LanguageModelRequestMessage { + role: Role::Assistant, + content: assistant_content, + cache: false, + reasoning_details: None, + }); + into_anthropic( + request, + "claude-sonnet-4-5".to_string(), + 1.0, + 16000, + AnthropicModelMode::Thinking { + budget_tokens: Some(10000), + }, + ) + } + + #[test] + fn test_unsigned_thinking_blocks_stripped() { + let result = request_with_assistant_content(vec![ + MessageContent::Thinking { + text: "Cancelled mid-think, no signature".to_string(), + signature: None, + }, + MessageContent::Text("Some response text".to_string()), + ]); + + let assistant_message = result + .messages + .iter() + .find(|m| m.role == anthropic::Role::Assistant) + .expect("assistant message should still exist"); + + assert_eq!( + assistant_message.content.len(), + 1, + "Only the text content should remain; unsigned thinking block should be stripped" + ); + assert!(matches!( + &assistant_message.content[0], + anthropic::RequestContent::Text { text, .. } if text == "Some response text" + )); + } + + #[test] + fn test_signed_thinking_blocks_preserved() { + let result = request_with_assistant_content(vec![ + MessageContent::Thinking { + text: "Completed thinking".to_string(), + signature: Some("valid-signature".to_string()), + }, + MessageContent::Text("Response".to_string()), + ]); + + let assistant_message = result + .messages + .iter() + .find(|m| m.role == anthropic::Role::Assistant) + .expect("assistant message should exist"); + + assert_eq!( + assistant_message.content.len(), + 2, + "Both the signed thinking block and text should be preserved" + ); + assert!(matches!( + &assistant_message.content[0], + anthropic::RequestContent::Thinking { thinking, signature, .. } + if thinking == "Completed thinking" && signature == "valid-signature" + )); + } + + #[test] + fn test_only_unsigned_thinking_block_omits_entire_message() { + let result = request_with_assistant_content(vec![MessageContent::Thinking { + text: "Cancelled before any text or signature".to_string(), + signature: None, + }]); + + let assistant_messages: Vec<_> = result + .messages + .iter() + .filter(|m| m.role == anthropic::Role::Assistant) + .collect(); + + assert_eq!( + assistant_messages.len(), + 0, + "An assistant message whose only content was an unsigned thinking block \ + should be omitted entirely" + ); + } } diff --git a/crates/language_models/src/provider/bedrock.rs b/crates/language_models/src/provider/bedrock.rs index da7118f0ec1220e7ddeb681b7f6258cbfaf4e5e8..98b276bb49b76aa78c7cbd0a21966a5845044209 100644 --- a/crates/language_models/src/provider/bedrock.rs +++ b/crates/language_models/src/provider/bedrock.rs @@ -770,6 +770,12 @@ pub fn into_bedrock( // And the AWS API demands that you strip them return None; } + if signature.is_none() { + // Thinking blocks without a signature are invalid + // (e.g. from cancellation mid-think) and must be + // stripped to avoid API errors. + return None; + } let thinking = BedrockThinkingTextBlock::builder() .text(text) .set_signature(signature) @@ -850,6 +856,10 @@ pub fn into_bedrock( Role::Assistant => bedrock::BedrockRole::Assistant, Role::System => unreachable!("System role should never occur here"), }; + if bedrock_message_content.is_empty() { + continue; + } + if let Some(last_message) = new_messages.last_mut() && last_message.role == bedrock_role {