thread.rs

   1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ProjectSnapshot, ReadFileTool,
   5    RestoreFileFromDiskTool, SaveFileTool, StreamingEditFileTool, SubagentTool,
   6    SystemPromptTemplate, Template, Templates, TerminalTool, ThinkingTool, ToolPermissionDecision,
   7    WebSearchTool, decide_permission_from_settings,
   8};
   9use acp_thread::{MentionUri, UserMessageId};
  10use action_log::ActionLog;
  11use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt as _, SubagentsFeatureFlag};
  12
  13use agent_client_protocol as acp;
  14use agent_settings::{
  15    AgentProfileId, AgentProfileSettings, AgentSettings, SUMMARIZE_THREAD_DETAILED_PROMPT,
  16    SUMMARIZE_THREAD_PROMPT,
  17};
  18use anyhow::{Context as _, Result, anyhow};
  19use chrono::{DateTime, Utc};
  20use client::UserStore;
  21use cloud_llm_client::{CompletionIntent, Plan};
  22use collections::{HashMap, HashSet, IndexMap};
  23use fs::Fs;
  24use futures::stream;
  25use futures::{
  26    FutureExt,
  27    channel::{mpsc, oneshot},
  28    future::Shared,
  29    stream::FuturesUnordered,
  30};
  31use gpui::{
  32    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  33};
  34use language_model::{
  35    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  36    LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
  37    LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
  38    LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse,
  39    LanguageModelToolUseId, Role, SelectedModel, StopReason, TokenUsage, ZED_CLOUD_PROVIDER_ID,
  40};
  41use project::Project;
  42use prompt_store::ProjectContext;
  43use schemars::{JsonSchema, Schema};
  44use serde::{Deserialize, Serialize};
  45use settings::{LanguageModelSelection, Settings, ToolPermissionMode, update_settings_file};
  46use smol::stream::StreamExt;
  47use std::{
  48    collections::BTreeMap,
  49    ops::RangeInclusive,
  50    path::Path,
  51    rc::Rc,
  52    sync::Arc,
  53    time::{Duration, Instant},
  54};
  55use std::{fmt::Write, path::PathBuf};
  56use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock, paths::PathStyle};
  57use uuid::Uuid;
  58
  59const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  60pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  61pub const MAX_SUBAGENT_DEPTH: u8 = 4;
  62pub const MAX_PARALLEL_SUBAGENTS: usize = 8;
  63
  64/// Context passed to a subagent thread for lifecycle management
  65#[derive(Clone)]
  66pub struct SubagentContext {
  67    /// ID of the parent thread
  68    pub parent_thread_id: acp::SessionId,
  69
  70    /// ID of the tool call that spawned this subagent
  71    pub tool_use_id: LanguageModelToolUseId,
  72
  73    /// Current depth level (0 = root agent, 1 = first-level subagent, etc.)
  74    pub depth: u8,
  75
  76    /// Prompt to send when subagent completes successfully
  77    pub summary_prompt: String,
  78
  79    /// Prompt to send when context is running low (≤25% remaining)
  80    pub context_low_prompt: String,
  81}
  82
  83/// The ID of the user prompt that initiated a request.
  84///
  85/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  86#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  87pub struct PromptId(Arc<str>);
  88
  89impl PromptId {
  90    pub fn new() -> Self {
  91        Self(Uuid::new_v4().to_string().into())
  92    }
  93}
  94
  95impl std::fmt::Display for PromptId {
  96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  97        write!(f, "{}", self.0)
  98    }
  99}
 100
 101pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
 102pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
 103
 104#[derive(Debug, Clone)]
 105enum RetryStrategy {
 106    ExponentialBackoff {
 107        initial_delay: Duration,
 108        max_attempts: u8,
 109    },
 110    Fixed {
 111        delay: Duration,
 112        max_attempts: u8,
 113    },
 114}
 115
 116#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 117pub enum Message {
 118    User(UserMessage),
 119    Agent(AgentMessage),
 120    Resume,
 121}
 122
 123impl Message {
 124    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 125        match self {
 126            Message::Agent(agent_message) => Some(agent_message),
 127            _ => None,
 128        }
 129    }
 130
 131    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 132        match self {
 133            Message::User(message) => {
 134                if message.content.is_empty() {
 135                    vec![]
 136                } else {
 137                    vec![message.to_request()]
 138                }
 139            }
 140            Message::Agent(message) => message.to_request(),
 141            Message::Resume => vec![LanguageModelRequestMessage {
 142                role: Role::User,
 143                content: vec!["Continue where you left off".into()],
 144                cache: false,
 145                reasoning_details: None,
 146            }],
 147        }
 148    }
 149
 150    pub fn to_markdown(&self) -> String {
 151        match self {
 152            Message::User(message) => message.to_markdown(),
 153            Message::Agent(message) => message.to_markdown(),
 154            Message::Resume => "[resume]\n".into(),
 155        }
 156    }
 157
 158    pub fn role(&self) -> Role {
 159        match self {
 160            Message::User(_) | Message::Resume => Role::User,
 161            Message::Agent(_) => Role::Assistant,
 162        }
 163    }
 164}
 165
 166#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 167pub struct UserMessage {
 168    pub id: UserMessageId,
 169    pub content: Vec<UserMessageContent>,
 170}
 171
 172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 173pub enum UserMessageContent {
 174    Text(String),
 175    Mention { uri: MentionUri, content: String },
 176    Image(LanguageModelImage),
 177}
 178
 179impl UserMessage {
 180    pub fn to_markdown(&self) -> String {
 181        let mut markdown = String::from("## User\n\n");
 182
 183        for content in &self.content {
 184            match content {
 185                UserMessageContent::Text(text) => {
 186                    markdown.push_str(text);
 187                    markdown.push('\n');
 188                }
 189                UserMessageContent::Image(_) => {
 190                    markdown.push_str("<image />\n");
 191                }
 192                UserMessageContent::Mention { uri, content } => {
 193                    if !content.is_empty() {
 194                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 195                    } else {
 196                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 197                    }
 198                }
 199            }
 200        }
 201
 202        markdown
 203    }
 204
 205    fn to_request(&self) -> LanguageModelRequestMessage {
 206        let mut message = LanguageModelRequestMessage {
 207            role: Role::User,
 208            content: Vec::with_capacity(self.content.len()),
 209            cache: false,
 210            reasoning_details: None,
 211        };
 212
 213        const OPEN_CONTEXT: &str = "<context>\n\
 214            The following items were attached by the user. \
 215            They are up-to-date and don't need to be re-read.\n\n";
 216
 217        const OPEN_FILES_TAG: &str = "<files>";
 218        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 219        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 220        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 221        const OPEN_THREADS_TAG: &str = "<threads>";
 222        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 223        const OPEN_RULES_TAG: &str =
 224            "<rules>\nThe user has specified the following rules that should be applied:\n";
 225        const OPEN_DIAGNOSTICS_TAG: &str = "<diagnostics>";
 226
 227        let mut file_context = OPEN_FILES_TAG.to_string();
 228        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 229        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 230        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 231        let mut thread_context = OPEN_THREADS_TAG.to_string();
 232        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 233        let mut rules_context = OPEN_RULES_TAG.to_string();
 234        let mut diagnostics_context = OPEN_DIAGNOSTICS_TAG.to_string();
 235
 236        for chunk in &self.content {
 237            let chunk = match chunk {
 238                UserMessageContent::Text(text) => {
 239                    language_model::MessageContent::Text(text.clone())
 240                }
 241                UserMessageContent::Image(value) => {
 242                    language_model::MessageContent::Image(value.clone())
 243                }
 244                UserMessageContent::Mention { uri, content } => {
 245                    match uri {
 246                        MentionUri::File { abs_path } => {
 247                            write!(
 248                                &mut file_context,
 249                                "\n{}",
 250                                MarkdownCodeBlock {
 251                                    tag: &codeblock_tag(abs_path, None),
 252                                    text: &content.to_string(),
 253                                }
 254                            )
 255                            .ok();
 256                        }
 257                        MentionUri::PastedImage => {
 258                            debug_panic!("pasted image URI should not be used in mention content")
 259                        }
 260                        MentionUri::Directory { .. } => {
 261                            write!(&mut directory_context, "\n{}\n", content).ok();
 262                        }
 263                        MentionUri::Symbol {
 264                            abs_path: path,
 265                            line_range,
 266                            ..
 267                        } => {
 268                            write!(
 269                                &mut symbol_context,
 270                                "\n{}",
 271                                MarkdownCodeBlock {
 272                                    tag: &codeblock_tag(path, Some(line_range)),
 273                                    text: content
 274                                }
 275                            )
 276                            .ok();
 277                        }
 278                        MentionUri::Selection {
 279                            abs_path: path,
 280                            line_range,
 281                            ..
 282                        } => {
 283                            write!(
 284                                &mut selection_context,
 285                                "\n{}",
 286                                MarkdownCodeBlock {
 287                                    tag: &codeblock_tag(
 288                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 289                                        Some(line_range)
 290                                    ),
 291                                    text: content
 292                                }
 293                            )
 294                            .ok();
 295                        }
 296                        MentionUri::Thread { .. } => {
 297                            write!(&mut thread_context, "\n{}\n", content).ok();
 298                        }
 299                        MentionUri::TextThread { .. } => {
 300                            write!(&mut thread_context, "\n{}\n", content).ok();
 301                        }
 302                        MentionUri::Rule { .. } => {
 303                            write!(
 304                                &mut rules_context,
 305                                "\n{}",
 306                                MarkdownCodeBlock {
 307                                    tag: "",
 308                                    text: content
 309                                }
 310                            )
 311                            .ok();
 312                        }
 313                        MentionUri::Fetch { url } => {
 314                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 315                        }
 316                        MentionUri::Diagnostics { .. } => {
 317                            write!(&mut diagnostics_context, "\n{}\n", content).ok();
 318                        }
 319                    }
 320
 321                    language_model::MessageContent::Text(uri.as_link().to_string())
 322                }
 323            };
 324
 325            message.content.push(chunk);
 326        }
 327
 328        let len_before_context = message.content.len();
 329
 330        if file_context.len() > OPEN_FILES_TAG.len() {
 331            file_context.push_str("</files>\n");
 332            message
 333                .content
 334                .push(language_model::MessageContent::Text(file_context));
 335        }
 336
 337        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 338            directory_context.push_str("</directories>\n");
 339            message
 340                .content
 341                .push(language_model::MessageContent::Text(directory_context));
 342        }
 343
 344        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 345            symbol_context.push_str("</symbols>\n");
 346            message
 347                .content
 348                .push(language_model::MessageContent::Text(symbol_context));
 349        }
 350
 351        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 352            selection_context.push_str("</selections>\n");
 353            message
 354                .content
 355                .push(language_model::MessageContent::Text(selection_context));
 356        }
 357
 358        if thread_context.len() > OPEN_THREADS_TAG.len() {
 359            thread_context.push_str("</threads>\n");
 360            message
 361                .content
 362                .push(language_model::MessageContent::Text(thread_context));
 363        }
 364
 365        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 366            fetch_context.push_str("</fetched_urls>\n");
 367            message
 368                .content
 369                .push(language_model::MessageContent::Text(fetch_context));
 370        }
 371
 372        if rules_context.len() > OPEN_RULES_TAG.len() {
 373            rules_context.push_str("</user_rules>\n");
 374            message
 375                .content
 376                .push(language_model::MessageContent::Text(rules_context));
 377        }
 378
 379        if diagnostics_context.len() > OPEN_DIAGNOSTICS_TAG.len() {
 380            diagnostics_context.push_str("</diagnostics>\n");
 381            message
 382                .content
 383                .push(language_model::MessageContent::Text(diagnostics_context));
 384        }
 385
 386        if message.content.len() > len_before_context {
 387            message.content.insert(
 388                len_before_context,
 389                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 390            );
 391            message
 392                .content
 393                .push(language_model::MessageContent::Text("</context>".into()));
 394        }
 395
 396        message
 397    }
 398}
 399
 400fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 401    let mut result = String::new();
 402
 403    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 404        let _ = write!(result, "{} ", extension);
 405    }
 406
 407    let _ = write!(result, "{}", full_path.display());
 408
 409    if let Some(range) = line_range {
 410        if range.start() == range.end() {
 411            let _ = write!(result, ":{}", range.start() + 1);
 412        } else {
 413            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 414        }
 415    }
 416
 417    result
 418}
 419
 420impl AgentMessage {
 421    pub fn to_markdown(&self) -> String {
 422        let mut markdown = String::from("## Assistant\n\n");
 423
 424        for content in &self.content {
 425            match content {
 426                AgentMessageContent::Text(text) => {
 427                    markdown.push_str(text);
 428                    markdown.push('\n');
 429                }
 430                AgentMessageContent::Thinking { text, .. } => {
 431                    markdown.push_str("<think>");
 432                    markdown.push_str(text);
 433                    markdown.push_str("</think>\n");
 434                }
 435                AgentMessageContent::RedactedThinking(_) => {
 436                    markdown.push_str("<redacted_thinking />\n")
 437                }
 438                AgentMessageContent::ToolUse(tool_use) => {
 439                    markdown.push_str(&format!(
 440                        "**Tool Use**: {} (ID: {})\n",
 441                        tool_use.name, tool_use.id
 442                    ));
 443                    markdown.push_str(&format!(
 444                        "{}\n",
 445                        MarkdownCodeBlock {
 446                            tag: "json",
 447                            text: &format!("{:#}", tool_use.input)
 448                        }
 449                    ));
 450                }
 451            }
 452        }
 453
 454        for tool_result in self.tool_results.values() {
 455            markdown.push_str(&format!(
 456                "**Tool Result**: {} (ID: {})\n\n",
 457                tool_result.tool_name, tool_result.tool_use_id
 458            ));
 459            if tool_result.is_error {
 460                markdown.push_str("**ERROR:**\n");
 461            }
 462
 463            match &tool_result.content {
 464                LanguageModelToolResultContent::Text(text) => {
 465                    writeln!(markdown, "{text}\n").ok();
 466                }
 467                LanguageModelToolResultContent::Image(_) => {
 468                    writeln!(markdown, "<image />\n").ok();
 469                }
 470            }
 471
 472            if let Some(output) = tool_result.output.as_ref() {
 473                writeln!(
 474                    markdown,
 475                    "**Debug Output**:\n\n```json\n{}\n```\n",
 476                    serde_json::to_string_pretty(output).unwrap()
 477                )
 478                .unwrap();
 479            }
 480        }
 481
 482        markdown
 483    }
 484
 485    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 486        let mut assistant_message = LanguageModelRequestMessage {
 487            role: Role::Assistant,
 488            content: Vec::with_capacity(self.content.len()),
 489            cache: false,
 490            reasoning_details: self.reasoning_details.clone(),
 491        };
 492        for chunk in &self.content {
 493            match chunk {
 494                AgentMessageContent::Text(text) => {
 495                    assistant_message
 496                        .content
 497                        .push(language_model::MessageContent::Text(text.clone()));
 498                }
 499                AgentMessageContent::Thinking { text, signature } => {
 500                    assistant_message
 501                        .content
 502                        .push(language_model::MessageContent::Thinking {
 503                            text: text.clone(),
 504                            signature: signature.clone(),
 505                        });
 506                }
 507                AgentMessageContent::RedactedThinking(value) => {
 508                    assistant_message.content.push(
 509                        language_model::MessageContent::RedactedThinking(value.clone()),
 510                    );
 511                }
 512                AgentMessageContent::ToolUse(tool_use) => {
 513                    if self.tool_results.contains_key(&tool_use.id) {
 514                        assistant_message
 515                            .content
 516                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 517                    }
 518                }
 519            };
 520        }
 521
 522        let mut user_message = LanguageModelRequestMessage {
 523            role: Role::User,
 524            content: Vec::new(),
 525            cache: false,
 526            reasoning_details: None,
 527        };
 528
 529        for tool_result in self.tool_results.values() {
 530            let mut tool_result = tool_result.clone();
 531            // Surprisingly, the API fails if we return an empty string here.
 532            // It thinks we are sending a tool use without a tool result.
 533            if tool_result.content.is_empty() {
 534                tool_result.content = "<Tool returned an empty string>".into();
 535            }
 536            user_message
 537                .content
 538                .push(language_model::MessageContent::ToolResult(tool_result));
 539        }
 540
 541        let mut messages = Vec::new();
 542        if !assistant_message.content.is_empty() {
 543            messages.push(assistant_message);
 544        }
 545        if !user_message.content.is_empty() {
 546            messages.push(user_message);
 547        }
 548        messages
 549    }
 550}
 551
 552#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 553pub struct AgentMessage {
 554    pub content: Vec<AgentMessageContent>,
 555    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 556    pub reasoning_details: Option<serde_json::Value>,
 557}
 558
 559#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 560pub enum AgentMessageContent {
 561    Text(String),
 562    Thinking {
 563        text: String,
 564        signature: Option<String>,
 565    },
 566    RedactedThinking(String),
 567    ToolUse(LanguageModelToolUse),
 568}
 569
 570pub trait TerminalHandle {
 571    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 572    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 573    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 574    fn kill(&self, cx: &AsyncApp) -> Result<()>;
 575    fn was_stopped_by_user(&self, cx: &AsyncApp) -> Result<bool>;
 576}
 577
 578pub trait ThreadEnvironment {
 579    fn create_terminal(
 580        &self,
 581        command: String,
 582        cwd: Option<PathBuf>,
 583        output_byte_limit: Option<u64>,
 584        cx: &mut AsyncApp,
 585    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 586}
 587
 588#[derive(Debug)]
 589pub enum ThreadEvent {
 590    UserMessage(UserMessage),
 591    AgentText(String),
 592    AgentThinking(String),
 593    ToolCall(acp::ToolCall),
 594    ToolCallUpdate(acp_thread::ToolCallUpdate),
 595    ToolCallAuthorization(ToolCallAuthorization),
 596    Retry(acp_thread::RetryStatus),
 597    Stop(acp::StopReason),
 598}
 599
 600#[derive(Debug)]
 601pub struct NewTerminal {
 602    pub command: String,
 603    pub output_byte_limit: Option<u64>,
 604    pub cwd: Option<PathBuf>,
 605    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 606}
 607
 608#[derive(Debug, Clone)]
 609pub struct ToolPermissionContext {
 610    pub tool_name: String,
 611    pub input_value: String,
 612}
 613
 614impl ToolPermissionContext {
 615    pub fn new(tool_name: impl Into<String>, input_value: impl Into<String>) -> Self {
 616        Self {
 617            tool_name: tool_name.into(),
 618            input_value: input_value.into(),
 619        }
 620    }
 621
 622    /// Builds the permission options for this tool context.
 623    ///
 624    /// This is the canonical source for permission option generation.
 625    /// Tests should use this function rather than manually constructing options.
 626    pub fn build_permission_options(&self) -> acp_thread::PermissionOptions {
 627        use crate::pattern_extraction::*;
 628
 629        let tool_name = &self.tool_name;
 630        let input_value = &self.input_value;
 631
 632        let (pattern, pattern_display) = match tool_name.as_str() {
 633            "terminal" => (
 634                extract_terminal_pattern(input_value),
 635                extract_terminal_pattern_display(input_value),
 636            ),
 637            "edit_file" | "delete_path" | "move_path" | "create_directory" | "save_file" => (
 638                extract_path_pattern(input_value),
 639                extract_path_pattern_display(input_value),
 640            ),
 641            "fetch" => (
 642                extract_url_pattern(input_value),
 643                extract_url_pattern_display(input_value),
 644            ),
 645            _ => (None, None),
 646        };
 647
 648        let mut choices = Vec::new();
 649
 650        let mut push_choice = |label: String, allow_id, deny_id, allow_kind, deny_kind| {
 651            choices.push(acp_thread::PermissionOptionChoice {
 652                allow: acp::PermissionOption::new(
 653                    acp::PermissionOptionId::new(allow_id),
 654                    label.clone(),
 655                    allow_kind,
 656                ),
 657                deny: acp::PermissionOption::new(
 658                    acp::PermissionOptionId::new(deny_id),
 659                    label,
 660                    deny_kind,
 661                ),
 662            });
 663        };
 664
 665        push_choice(
 666            format!("Always for {}", tool_name.replace('_', " ")),
 667            format!("always_allow:{}", tool_name),
 668            format!("always_deny:{}", tool_name),
 669            acp::PermissionOptionKind::AllowAlways,
 670            acp::PermissionOptionKind::RejectAlways,
 671        );
 672
 673        if let (Some(pattern), Some(display)) = (pattern, pattern_display) {
 674            let button_text = match tool_name.as_str() {
 675                "terminal" => format!("Always for `{}` commands", display),
 676                "fetch" => format!("Always for `{}`", display),
 677                _ => format!("Always for `{}`", display),
 678            };
 679            push_choice(
 680                button_text,
 681                format!("always_allow_pattern:{}:{}", tool_name, pattern),
 682                format!("always_deny_pattern:{}:{}", tool_name, pattern),
 683                acp::PermissionOptionKind::AllowAlways,
 684                acp::PermissionOptionKind::RejectAlways,
 685            );
 686        }
 687
 688        push_choice(
 689            "Only this time".to_string(),
 690            "allow".to_string(),
 691            "deny".to_string(),
 692            acp::PermissionOptionKind::AllowOnce,
 693            acp::PermissionOptionKind::RejectOnce,
 694        );
 695
 696        acp_thread::PermissionOptions::Dropdown(choices)
 697    }
 698}
 699
 700#[derive(Debug)]
 701pub struct ToolCallAuthorization {
 702    pub tool_call: acp::ToolCallUpdate,
 703    pub options: acp_thread::PermissionOptions,
 704    pub response: oneshot::Sender<acp::PermissionOptionId>,
 705    pub context: Option<ToolPermissionContext>,
 706}
 707
 708#[derive(Debug, thiserror::Error)]
 709enum CompletionError {
 710    #[error("max tokens")]
 711    MaxTokens,
 712    #[error("refusal")]
 713    Refusal,
 714    #[error(transparent)]
 715    Other(#[from] anyhow::Error),
 716}
 717
 718pub struct Thread {
 719    id: acp::SessionId,
 720    prompt_id: PromptId,
 721    updated_at: DateTime<Utc>,
 722    title: Option<SharedString>,
 723    pending_title_generation: Option<Task<()>>,
 724    pending_summary_generation: Option<Shared<Task<Option<SharedString>>>>,
 725    summary: Option<SharedString>,
 726    messages: Vec<Message>,
 727    user_store: Entity<UserStore>,
 728    /// Holds the task that handles agent interaction until the end of the turn.
 729    /// Survives across multiple requests as the model performs tool calls and
 730    /// we run tools, report their results.
 731    running_turn: Option<RunningTurn>,
 732    /// Flag indicating the UI has a queued message waiting to be sent.
 733    /// Used to signal that the turn should end at the next message boundary.
 734    has_queued_message: bool,
 735    pending_message: Option<AgentMessage>,
 736    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 737    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 738    #[allow(unused)]
 739    cumulative_token_usage: TokenUsage,
 740    #[allow(unused)]
 741    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 742    context_server_registry: Entity<ContextServerRegistry>,
 743    profile_id: AgentProfileId,
 744    project_context: Entity<ProjectContext>,
 745    templates: Arc<Templates>,
 746    model: Option<Arc<dyn LanguageModel>>,
 747    summarization_model: Option<Arc<dyn LanguageModel>>,
 748    thinking_enabled: bool,
 749    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 750    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 751    pub(crate) project: Entity<Project>,
 752    pub(crate) action_log: Entity<ActionLog>,
 753    /// Tracks the last time files were read by the agent, to detect external modifications
 754    pub(crate) file_read_times: HashMap<PathBuf, fs::MTime>,
 755    /// True if this thread was imported from a shared thread and can be synced.
 756    imported: bool,
 757    /// If this is a subagent thread, contains context about the parent
 758    subagent_context: Option<SubagentContext>,
 759    /// Weak references to running subagent threads for cancellation propagation
 760    running_subagents: Vec<WeakEntity<Thread>>,
 761}
 762
 763impl Thread {
 764    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 765        let image = model.map_or(true, |model| model.supports_images());
 766        acp::PromptCapabilities::new()
 767            .image(image)
 768            .embedded_context(true)
 769    }
 770
 771    pub fn new(
 772        project: Entity<Project>,
 773        project_context: Entity<ProjectContext>,
 774        context_server_registry: Entity<ContextServerRegistry>,
 775        templates: Arc<Templates>,
 776        model: Option<Arc<dyn LanguageModel>>,
 777        cx: &mut Context<Self>,
 778    ) -> Self {
 779        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 780        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 781        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 782            watch::channel(Self::prompt_capabilities(model.as_deref()));
 783        Self {
 784            id: acp::SessionId::new(uuid::Uuid::new_v4().to_string()),
 785            prompt_id: PromptId::new(),
 786            updated_at: Utc::now(),
 787            title: None,
 788            pending_title_generation: None,
 789            pending_summary_generation: None,
 790            summary: None,
 791            messages: Vec::new(),
 792            user_store: project.read(cx).user_store(),
 793            running_turn: None,
 794            has_queued_message: false,
 795            pending_message: None,
 796            tools: BTreeMap::default(),
 797            request_token_usage: HashMap::default(),
 798            cumulative_token_usage: TokenUsage::default(),
 799            initial_project_snapshot: {
 800                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 801                cx.foreground_executor()
 802                    .spawn(async move { Some(project_snapshot.await) })
 803                    .shared()
 804            },
 805            context_server_registry,
 806            profile_id,
 807            project_context,
 808            templates,
 809            model,
 810            summarization_model: None,
 811            thinking_enabled: true,
 812            prompt_capabilities_tx,
 813            prompt_capabilities_rx,
 814            project,
 815            action_log,
 816            file_read_times: HashMap::default(),
 817            imported: false,
 818            subagent_context: None,
 819            running_subagents: Vec::new(),
 820        }
 821    }
 822
 823    pub fn new_subagent(
 824        project: Entity<Project>,
 825        project_context: Entity<ProjectContext>,
 826        context_server_registry: Entity<ContextServerRegistry>,
 827        templates: Arc<Templates>,
 828        model: Arc<dyn LanguageModel>,
 829        subagent_context: SubagentContext,
 830        parent_tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 831        cx: &mut Context<Self>,
 832    ) -> Self {
 833        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 834        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 835        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 836            watch::channel(Self::prompt_capabilities(Some(model.as_ref())));
 837
 838        // Rebind tools that hold thread references to use this subagent's thread
 839        // instead of the parent's thread. This is critical for tools like EditFileTool
 840        // that make model requests using the thread's ID.
 841        let weak_self = cx.weak_entity();
 842        let tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>> = parent_tools
 843            .into_iter()
 844            .map(|(name, tool)| {
 845                let rebound = tool.rebind_thread(weak_self.clone()).unwrap_or(tool);
 846                (name, rebound)
 847            })
 848            .collect();
 849
 850        Self {
 851            id: acp::SessionId::new(uuid::Uuid::new_v4().to_string()),
 852            prompt_id: PromptId::new(),
 853            updated_at: Utc::now(),
 854            title: None,
 855            pending_title_generation: None,
 856            pending_summary_generation: None,
 857            summary: None,
 858            messages: Vec::new(),
 859            user_store: project.read(cx).user_store(),
 860            running_turn: None,
 861            has_queued_message: false,
 862            pending_message: None,
 863            tools,
 864            request_token_usage: HashMap::default(),
 865            cumulative_token_usage: TokenUsage::default(),
 866            initial_project_snapshot: Task::ready(None).shared(),
 867            context_server_registry,
 868            profile_id,
 869            project_context,
 870            templates,
 871            model: Some(model),
 872            summarization_model: None,
 873            thinking_enabled: true,
 874            prompt_capabilities_tx,
 875            prompt_capabilities_rx,
 876            project,
 877            action_log,
 878            file_read_times: HashMap::default(),
 879            imported: false,
 880            subagent_context: Some(subagent_context),
 881            running_subagents: Vec::new(),
 882        }
 883    }
 884
 885    pub fn id(&self) -> &acp::SessionId {
 886        &self.id
 887    }
 888
 889    /// Returns true if this thread was imported from a shared thread.
 890    pub fn is_imported(&self) -> bool {
 891        self.imported
 892    }
 893
 894    pub fn replay(
 895        &mut self,
 896        cx: &mut Context<Self>,
 897    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 898        let (tx, rx) = mpsc::unbounded();
 899        let stream = ThreadEventStream(tx);
 900        for message in &self.messages {
 901            match message {
 902                Message::User(user_message) => stream.send_user_message(user_message),
 903                Message::Agent(assistant_message) => {
 904                    for content in &assistant_message.content {
 905                        match content {
 906                            AgentMessageContent::Text(text) => stream.send_text(text),
 907                            AgentMessageContent::Thinking { text, .. } => {
 908                                stream.send_thinking(text)
 909                            }
 910                            AgentMessageContent::RedactedThinking(_) => {}
 911                            AgentMessageContent::ToolUse(tool_use) => {
 912                                self.replay_tool_call(
 913                                    tool_use,
 914                                    assistant_message.tool_results.get(&tool_use.id),
 915                                    &stream,
 916                                    cx,
 917                                );
 918                            }
 919                        }
 920                    }
 921                }
 922                Message::Resume => {}
 923            }
 924        }
 925        rx
 926    }
 927
 928    fn replay_tool_call(
 929        &self,
 930        tool_use: &LanguageModelToolUse,
 931        tool_result: Option<&LanguageModelToolResult>,
 932        stream: &ThreadEventStream,
 933        cx: &mut Context<Self>,
 934    ) {
 935        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
 936            self.context_server_registry
 937                .read(cx)
 938                .servers()
 939                .find_map(|(_, tools)| {
 940                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
 941                        Some(tool.clone())
 942                    } else {
 943                        None
 944                    }
 945                })
 946        });
 947
 948        let Some(tool) = tool else {
 949            stream
 950                .0
 951                .unbounded_send(Ok(ThreadEvent::ToolCall(
 952                    acp::ToolCall::new(tool_use.id.to_string(), tool_use.name.to_string())
 953                        .status(acp::ToolCallStatus::Failed)
 954                        .raw_input(tool_use.input.clone()),
 955                )))
 956                .ok();
 957            return;
 958        };
 959
 960        let title = tool.initial_title(tool_use.input.clone(), cx);
 961        let kind = tool.kind();
 962        stream.send_tool_call(
 963            &tool_use.id,
 964            &tool_use.name,
 965            title,
 966            kind,
 967            tool_use.input.clone(),
 968        );
 969
 970        let output = tool_result
 971            .as_ref()
 972            .and_then(|result| result.output.clone());
 973        if let Some(output) = output.clone() {
 974            // For replay, we use a dummy cancellation receiver since the tool already completed
 975            let (_cancellation_tx, cancellation_rx) = watch::channel(false);
 976            let tool_event_stream = ToolCallEventStream::new(
 977                tool_use.id.clone(),
 978                stream.clone(),
 979                Some(self.project.read(cx).fs().clone()),
 980                cancellation_rx,
 981            );
 982            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
 983                .log_err();
 984        }
 985
 986        stream.update_tool_call_fields(
 987            &tool_use.id,
 988            acp::ToolCallUpdateFields::new()
 989                .status(
 990                    tool_result
 991                        .as_ref()
 992                        .map_or(acp::ToolCallStatus::Failed, |result| {
 993                            if result.is_error {
 994                                acp::ToolCallStatus::Failed
 995                            } else {
 996                                acp::ToolCallStatus::Completed
 997                            }
 998                        }),
 999                )
1000                .raw_output(output),
1001        );
1002    }
1003
1004    pub fn from_db(
1005        id: acp::SessionId,
1006        db_thread: DbThread,
1007        project: Entity<Project>,
1008        project_context: Entity<ProjectContext>,
1009        context_server_registry: Entity<ContextServerRegistry>,
1010        templates: Arc<Templates>,
1011        cx: &mut Context<Self>,
1012    ) -> Self {
1013        let profile_id = db_thread
1014            .profile
1015            .unwrap_or_else(|| AgentSettings::get_global(cx).default_profile.clone());
1016
1017        let mut model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
1018            db_thread
1019                .model
1020                .and_then(|model| {
1021                    let model = SelectedModel {
1022                        provider: model.provider.clone().into(),
1023                        model: model.model.into(),
1024                    };
1025                    registry.select_model(&model, cx)
1026                })
1027                .or_else(|| registry.default_model())
1028                .map(|model| model.model)
1029        });
1030
1031        if model.is_none() {
1032            model = Self::resolve_profile_model(&profile_id, cx);
1033        }
1034        if model.is_none() {
1035            model = LanguageModelRegistry::global(cx).update(cx, |registry, _cx| {
1036                registry.default_model().map(|model| model.model)
1037            });
1038        }
1039
1040        let (prompt_capabilities_tx, prompt_capabilities_rx) =
1041            watch::channel(Self::prompt_capabilities(model.as_deref()));
1042
1043        let action_log = cx.new(|_| ActionLog::new(project.clone()));
1044
1045        Self {
1046            id,
1047            prompt_id: PromptId::new(),
1048            title: if db_thread.title.is_empty() {
1049                None
1050            } else {
1051                Some(db_thread.title.clone())
1052            },
1053            pending_title_generation: None,
1054            pending_summary_generation: None,
1055            summary: db_thread.detailed_summary,
1056            messages: db_thread.messages,
1057            user_store: project.read(cx).user_store(),
1058            running_turn: None,
1059            has_queued_message: false,
1060            pending_message: None,
1061            tools: BTreeMap::default(),
1062            request_token_usage: db_thread.request_token_usage.clone(),
1063            cumulative_token_usage: db_thread.cumulative_token_usage,
1064            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
1065            context_server_registry,
1066            profile_id,
1067            project_context,
1068            templates,
1069            model,
1070            summarization_model: None,
1071            // TODO: Persist this on the `DbThread`.
1072            thinking_enabled: true,
1073            project,
1074            action_log,
1075            updated_at: db_thread.updated_at,
1076            prompt_capabilities_tx,
1077            prompt_capabilities_rx,
1078            file_read_times: HashMap::default(),
1079            imported: db_thread.imported,
1080            subagent_context: None,
1081            running_subagents: Vec::new(),
1082        }
1083    }
1084
1085    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
1086        let initial_project_snapshot = self.initial_project_snapshot.clone();
1087        let mut thread = DbThread {
1088            title: self.title(),
1089            messages: self.messages.clone(),
1090            updated_at: self.updated_at,
1091            detailed_summary: self.summary.clone(),
1092            initial_project_snapshot: None,
1093            cumulative_token_usage: self.cumulative_token_usage,
1094            request_token_usage: self.request_token_usage.clone(),
1095            model: self.model.as_ref().map(|model| DbLanguageModel {
1096                provider: model.provider_id().to_string(),
1097                model: model.name().0.to_string(),
1098            }),
1099            profile: Some(self.profile_id.clone()),
1100            imported: self.imported,
1101        };
1102
1103        cx.background_spawn(async move {
1104            let initial_project_snapshot = initial_project_snapshot.await;
1105            thread.initial_project_snapshot = initial_project_snapshot;
1106            thread
1107        })
1108    }
1109
1110    /// Create a snapshot of the current project state including git information and unsaved buffers.
1111    fn project_snapshot(
1112        project: Entity<Project>,
1113        cx: &mut Context<Self>,
1114    ) -> Task<Arc<ProjectSnapshot>> {
1115        let task = project::telemetry_snapshot::TelemetrySnapshot::new(&project, cx);
1116        cx.spawn(async move |_, _| {
1117            let snapshot = task.await;
1118
1119            Arc::new(ProjectSnapshot {
1120                worktree_snapshots: snapshot.worktree_snapshots,
1121                timestamp: Utc::now(),
1122            })
1123        })
1124    }
1125
1126    pub fn project_context(&self) -> &Entity<ProjectContext> {
1127        &self.project_context
1128    }
1129
1130    pub fn project(&self) -> &Entity<Project> {
1131        &self.project
1132    }
1133
1134    pub fn action_log(&self) -> &Entity<ActionLog> {
1135        &self.action_log
1136    }
1137
1138    pub fn is_empty(&self) -> bool {
1139        self.messages.is_empty() && self.title.is_none()
1140    }
1141
1142    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
1143        self.model.as_ref()
1144    }
1145
1146    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
1147        let old_usage = self.latest_token_usage();
1148        self.model = Some(model);
1149        let new_caps = Self::prompt_capabilities(self.model.as_deref());
1150        let new_usage = self.latest_token_usage();
1151        if old_usage != new_usage {
1152            cx.emit(TokenUsageUpdated(new_usage));
1153        }
1154        self.prompt_capabilities_tx.send(new_caps).log_err();
1155        cx.notify()
1156    }
1157
1158    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
1159        self.summarization_model.as_ref()
1160    }
1161
1162    pub fn set_summarization_model(
1163        &mut self,
1164        model: Option<Arc<dyn LanguageModel>>,
1165        cx: &mut Context<Self>,
1166    ) {
1167        self.summarization_model = model;
1168        cx.notify()
1169    }
1170
1171    pub fn thinking_enabled(&self) -> bool {
1172        self.thinking_enabled
1173    }
1174
1175    pub fn set_thinking_enabled(&mut self, enabled: bool, cx: &mut Context<Self>) {
1176        self.thinking_enabled = enabled;
1177        cx.notify();
1178    }
1179
1180    pub fn last_message(&self) -> Option<Message> {
1181        if let Some(message) = self.pending_message.clone() {
1182            Some(Message::Agent(message))
1183        } else {
1184            self.messages.last().cloned()
1185        }
1186    }
1187
1188    pub fn add_default_tools(
1189        &mut self,
1190        environment: Rc<dyn ThreadEnvironment>,
1191        cx: &mut Context<Self>,
1192    ) {
1193        let language_registry = self.project.read(cx).languages().clone();
1194        self.add_tool(CopyPathTool::new(self.project.clone()));
1195        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
1196        self.add_tool(DeletePathTool::new(
1197            self.project.clone(),
1198            self.action_log.clone(),
1199        ));
1200        self.add_tool(DiagnosticsTool::new(self.project.clone()));
1201        self.add_tool(EditFileTool::new(
1202            self.project.clone(),
1203            cx.weak_entity(),
1204            language_registry.clone(),
1205            Templates::new(),
1206        ));
1207        self.add_tool(StreamingEditFileTool::new(
1208            self.project.clone(),
1209            cx.weak_entity(),
1210            language_registry,
1211            Templates::new(),
1212        ));
1213        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
1214        self.add_tool(FindPathTool::new(self.project.clone()));
1215        self.add_tool(GrepTool::new(self.project.clone()));
1216        self.add_tool(ListDirectoryTool::new(self.project.clone()));
1217        self.add_tool(MovePathTool::new(self.project.clone()));
1218        self.add_tool(NowTool);
1219        self.add_tool(OpenTool::new(self.project.clone()));
1220        self.add_tool(ReadFileTool::new(
1221            cx.weak_entity(),
1222            self.project.clone(),
1223            self.action_log.clone(),
1224        ));
1225        self.add_tool(SaveFileTool::new(self.project.clone()));
1226        self.add_tool(RestoreFileFromDiskTool::new(self.project.clone()));
1227        self.add_tool(TerminalTool::new(self.project.clone(), environment));
1228        self.add_tool(ThinkingTool);
1229        self.add_tool(WebSearchTool);
1230
1231        if cx.has_flag::<SubagentsFeatureFlag>() && self.depth() < MAX_SUBAGENT_DEPTH {
1232            let parent_tools = self.tools.clone();
1233            self.add_tool(SubagentTool::new(
1234                cx.weak_entity(),
1235                self.project.clone(),
1236                self.project_context.clone(),
1237                self.context_server_registry.clone(),
1238                self.templates.clone(),
1239                self.depth(),
1240                parent_tools,
1241            ));
1242        }
1243    }
1244
1245    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
1246        self.tools.insert(T::name().into(), tool.erase());
1247    }
1248
1249    pub fn remove_tool(&mut self, name: &str) -> bool {
1250        self.tools.remove(name).is_some()
1251    }
1252
1253    pub fn restrict_tools(&mut self, allowed: &collections::HashSet<SharedString>) {
1254        self.tools.retain(|name, _| allowed.contains(name));
1255    }
1256
1257    pub fn profile(&self) -> &AgentProfileId {
1258        &self.profile_id
1259    }
1260
1261    pub fn set_profile(&mut self, profile_id: AgentProfileId, cx: &mut Context<Self>) {
1262        if self.profile_id == profile_id {
1263            return;
1264        }
1265
1266        self.profile_id = profile_id;
1267
1268        // Swap to the profile's preferred model when available.
1269        if let Some(model) = Self::resolve_profile_model(&self.profile_id, cx) {
1270            self.set_model(model, cx);
1271        }
1272    }
1273
1274    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1275        for subagent in self.running_subagents.drain(..) {
1276            if let Some(subagent) = subagent.upgrade() {
1277                subagent.update(cx, |thread, cx| thread.cancel(cx)).detach();
1278            }
1279        }
1280
1281        let Some(running_turn) = self.running_turn.take() else {
1282            self.flush_pending_message(cx);
1283            return Task::ready(());
1284        };
1285
1286        let turn_task = running_turn.cancel();
1287
1288        cx.spawn(async move |this, cx| {
1289            turn_task.await;
1290            this.update(cx, |this, cx| {
1291                this.flush_pending_message(cx);
1292            })
1293            .ok();
1294        })
1295    }
1296
1297    pub fn set_has_queued_message(&mut self, has_queued: bool) {
1298        self.has_queued_message = has_queued;
1299    }
1300
1301    pub fn has_queued_message(&self) -> bool {
1302        self.has_queued_message
1303    }
1304
1305    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1306        let Some(last_user_message) = self.last_user_message() else {
1307            return;
1308        };
1309
1310        self.request_token_usage
1311            .insert(last_user_message.id.clone(), update);
1312        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1313        cx.notify();
1314    }
1315
1316    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1317        self.cancel(cx).detach();
1318        // Clear pending message since cancel will try to flush it asynchronously,
1319        // and we don't want that content to be added after we truncate
1320        self.pending_message.take();
1321        let Some(position) = self.messages.iter().position(
1322            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1323        ) else {
1324            return Err(anyhow!("Message not found"));
1325        };
1326
1327        for message in self.messages.drain(position..) {
1328            match message {
1329                Message::User(message) => {
1330                    self.request_token_usage.remove(&message.id);
1331                }
1332                Message::Agent(_) | Message::Resume => {}
1333            }
1334        }
1335        self.clear_summary();
1336        cx.notify();
1337        Ok(())
1338    }
1339
1340    pub fn latest_request_token_usage(&self) -> Option<language_model::TokenUsage> {
1341        let last_user_message = self.last_user_message()?;
1342        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1343        Some(*tokens)
1344    }
1345
1346    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1347        let usage = self.latest_request_token_usage()?;
1348        let model = self.model.clone()?;
1349        Some(acp_thread::TokenUsage {
1350            max_tokens: model.max_token_count(),
1351            used_tokens: usage.total_tokens(),
1352            input_tokens: usage.input_tokens,
1353            output_tokens: usage.output_tokens,
1354        })
1355    }
1356
1357    /// Get the total input token count as of the message before the given message.
1358    ///
1359    /// Returns `None` if:
1360    /// - `target_id` is the first message (no previous message)
1361    /// - The previous message hasn't received a response yet (no usage data)
1362    /// - `target_id` is not found in the messages
1363    pub fn tokens_before_message(&self, target_id: &UserMessageId) -> Option<u64> {
1364        let mut previous_user_message_id: Option<&UserMessageId> = None;
1365
1366        for message in &self.messages {
1367            if let Message::User(user_msg) = message {
1368                if &user_msg.id == target_id {
1369                    let prev_id = previous_user_message_id?;
1370                    let usage = self.request_token_usage.get(prev_id)?;
1371                    return Some(usage.input_tokens);
1372                }
1373                previous_user_message_id = Some(&user_msg.id);
1374            }
1375        }
1376        None
1377    }
1378
1379    /// Look up the active profile and resolve its preferred model if one is configured.
1380    fn resolve_profile_model(
1381        profile_id: &AgentProfileId,
1382        cx: &mut Context<Self>,
1383    ) -> Option<Arc<dyn LanguageModel>> {
1384        let selection = AgentSettings::get_global(cx)
1385            .profiles
1386            .get(profile_id)?
1387            .default_model
1388            .clone()?;
1389        Self::resolve_model_from_selection(&selection, cx)
1390    }
1391
1392    /// Translate a stored model selection into the configured model from the registry.
1393    fn resolve_model_from_selection(
1394        selection: &LanguageModelSelection,
1395        cx: &mut Context<Self>,
1396    ) -> Option<Arc<dyn LanguageModel>> {
1397        let selected = SelectedModel {
1398            provider: LanguageModelProviderId::from(selection.provider.0.clone()),
1399            model: LanguageModelId::from(selection.model.clone()),
1400        };
1401        LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
1402            registry
1403                .select_model(&selected, cx)
1404                .map(|configured| configured.model)
1405        })
1406    }
1407
1408    pub fn resume(
1409        &mut self,
1410        cx: &mut Context<Self>,
1411    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1412        self.messages.push(Message::Resume);
1413        cx.notify();
1414
1415        log::debug!("Total messages in thread: {}", self.messages.len());
1416        self.run_turn(cx)
1417    }
1418
1419    /// Sending a message results in the model streaming a response, which could include tool calls.
1420    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1421    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1422    pub fn send<T>(
1423        &mut self,
1424        id: UserMessageId,
1425        content: impl IntoIterator<Item = T>,
1426        cx: &mut Context<Self>,
1427    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1428    where
1429        T: Into<UserMessageContent>,
1430    {
1431        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1432        log::debug!("Thread::send content: {:?}", content);
1433
1434        self.messages
1435            .push(Message::User(UserMessage { id, content }));
1436        cx.notify();
1437
1438        self.send_existing(cx)
1439    }
1440
1441    pub fn send_existing(
1442        &mut self,
1443        cx: &mut Context<Self>,
1444    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1445        let model = self.model().context("No language model configured")?;
1446
1447        log::info!("Thread::send called with model: {}", model.name().0);
1448        self.advance_prompt_id();
1449
1450        log::debug!("Total messages in thread: {}", self.messages.len());
1451        self.run_turn(cx)
1452    }
1453
1454    pub fn push_acp_user_block(
1455        &mut self,
1456        id: UserMessageId,
1457        blocks: impl IntoIterator<Item = acp::ContentBlock>,
1458        path_style: PathStyle,
1459        cx: &mut Context<Self>,
1460    ) {
1461        let content = blocks
1462            .into_iter()
1463            .map(|block| UserMessageContent::from_content_block(block, path_style))
1464            .collect::<Vec<_>>();
1465        self.messages
1466            .push(Message::User(UserMessage { id, content }));
1467        cx.notify();
1468    }
1469
1470    pub fn push_acp_agent_block(&mut self, block: acp::ContentBlock, cx: &mut Context<Self>) {
1471        let text = match block {
1472            acp::ContentBlock::Text(text_content) => text_content.text,
1473            acp::ContentBlock::Image(_) => "[image]".to_string(),
1474            acp::ContentBlock::Audio(_) => "[audio]".to_string(),
1475            acp::ContentBlock::ResourceLink(resource_link) => resource_link.uri,
1476            acp::ContentBlock::Resource(resource) => match resource.resource {
1477                acp::EmbeddedResourceResource::TextResourceContents(resource) => resource.uri,
1478                acp::EmbeddedResourceResource::BlobResourceContents(resource) => resource.uri,
1479                _ => "[resource]".to_string(),
1480            },
1481            _ => "[unknown]".to_string(),
1482        };
1483
1484        self.messages.push(Message::Agent(AgentMessage {
1485            content: vec![AgentMessageContent::Text(text)],
1486            ..Default::default()
1487        }));
1488        cx.notify();
1489    }
1490
1491    #[cfg(feature = "eval")]
1492    pub fn proceed(
1493        &mut self,
1494        cx: &mut Context<Self>,
1495    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1496        self.run_turn(cx)
1497    }
1498
1499    fn run_turn(
1500        &mut self,
1501        cx: &mut Context<Self>,
1502    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1503        // Flush the old pending message synchronously before cancelling,
1504        // to avoid a race where the detached cancel task might flush the NEW
1505        // turn's pending message instead of the old one.
1506        self.flush_pending_message(cx);
1507        self.cancel(cx).detach();
1508
1509        let model = self.model.clone().context("No language model configured")?;
1510        let profile = AgentSettings::get_global(cx)
1511            .profiles
1512            .get(&self.profile_id)
1513            .context("Profile not found")?;
1514        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1515        let event_stream = ThreadEventStream(events_tx);
1516        let message_ix = self.messages.len().saturating_sub(1);
1517        self.clear_summary();
1518        let (cancellation_tx, mut cancellation_rx) = watch::channel(false);
1519        self.running_turn = Some(RunningTurn {
1520            event_stream: event_stream.clone(),
1521            tools: self.enabled_tools(profile, &model, cx),
1522            cancellation_tx,
1523            _task: cx.spawn(async move |this, cx| {
1524                log::debug!("Starting agent turn execution");
1525
1526                let turn_result = Self::run_turn_internal(
1527                    &this,
1528                    model,
1529                    &event_stream,
1530                    cancellation_rx.clone(),
1531                    cx,
1532                )
1533                .await;
1534
1535                // Check if we were cancelled - if so, cancel() already took running_turn
1536                // and we shouldn't touch it (it might be a NEW turn now)
1537                let was_cancelled = *cancellation_rx.borrow();
1538                if was_cancelled {
1539                    log::debug!("Turn was cancelled, skipping cleanup");
1540                    return;
1541                }
1542
1543                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1544
1545                match turn_result {
1546                    Ok(()) => {
1547                        log::debug!("Turn execution completed");
1548                        event_stream.send_stop(acp::StopReason::EndTurn);
1549                    }
1550                    Err(error) => {
1551                        log::error!("Turn execution failed: {:?}", error);
1552                        match error.downcast::<CompletionError>() {
1553                            Ok(CompletionError::Refusal) => {
1554                                event_stream.send_stop(acp::StopReason::Refusal);
1555                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1556                            }
1557                            Ok(CompletionError::MaxTokens) => {
1558                                event_stream.send_stop(acp::StopReason::MaxTokens);
1559                            }
1560                            Ok(CompletionError::Other(error)) | Err(error) => {
1561                                event_stream.send_error(error);
1562                            }
1563                        }
1564                    }
1565                }
1566
1567                _ = this.update(cx, |this, _| this.running_turn.take());
1568            }),
1569        });
1570        Ok(events_rx)
1571    }
1572
1573    async fn run_turn_internal(
1574        this: &WeakEntity<Self>,
1575        model: Arc<dyn LanguageModel>,
1576        event_stream: &ThreadEventStream,
1577        mut cancellation_rx: watch::Receiver<bool>,
1578        cx: &mut AsyncApp,
1579    ) -> Result<()> {
1580        let mut attempt = 0;
1581        let mut intent = CompletionIntent::UserPrompt;
1582        loop {
1583            let request =
1584                this.update(cx, |this, cx| this.build_completion_request(intent, cx))??;
1585
1586            telemetry::event!(
1587                "Agent Thread Completion",
1588                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1589                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1590                model = model.telemetry_id(),
1591                model_provider = model.provider_id().to_string(),
1592                attempt
1593            );
1594
1595            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1596
1597            let (mut events, mut error) = match model.stream_completion(request, cx).await {
1598                Ok(events) => (events.fuse(), None),
1599                Err(err) => (stream::empty().boxed().fuse(), Some(err)),
1600            };
1601            let mut tool_results = FuturesUnordered::new();
1602            let mut cancelled = false;
1603            loop {
1604                // Race between getting the first event and cancellation
1605                let first_event = futures::select! {
1606                    event = events.next().fuse() => event,
1607                    _ = cancellation_rx.changed().fuse() => {
1608                        if *cancellation_rx.borrow() {
1609                            cancelled = true;
1610                            break;
1611                        }
1612                        continue;
1613                    }
1614                };
1615                let Some(first_event) = first_event else {
1616                    break;
1617                };
1618
1619                // Collect all immediately available events to process as a batch
1620                let mut batch = vec![first_event];
1621                while let Some(event) = events.next().now_or_never().flatten() {
1622                    batch.push(event);
1623                }
1624
1625                // Process the batch in a single update
1626                let batch_result = this.update(cx, |this, cx| {
1627                    let mut batch_tool_results = Vec::new();
1628                    let mut batch_error = None;
1629
1630                    for event in batch {
1631                        log::trace!("Received completion event: {:?}", event);
1632                        match event {
1633                            Ok(event) => {
1634                                match this.handle_completion_event(
1635                                    event,
1636                                    event_stream,
1637                                    cancellation_rx.clone(),
1638                                    cx,
1639                                ) {
1640                                    Ok(Some(task)) => batch_tool_results.push(task),
1641                                    Ok(None) => {}
1642                                    Err(err) => {
1643                                        batch_error = Some(err);
1644                                        break;
1645                                    }
1646                                }
1647                            }
1648                            Err(err) => {
1649                                batch_error = Some(err.into());
1650                                break;
1651                            }
1652                        }
1653                    }
1654
1655                    cx.notify();
1656                    (batch_tool_results, batch_error)
1657                })?;
1658
1659                tool_results.extend(batch_result.0);
1660                if let Some(err) = batch_result.1 {
1661                    error = Some(err.downcast()?);
1662                    break;
1663                }
1664            }
1665
1666            // Drop the stream to release the rate limit permit before tool execution.
1667            // The stream holds a semaphore guard that limits concurrent requests.
1668            // Without this, the permit would be held during potentially long-running
1669            // tool execution, which could cause deadlocks when tools spawn subagents
1670            // that need their own permits.
1671            drop(events);
1672
1673            let end_turn = tool_results.is_empty();
1674            while let Some(tool_result) = tool_results.next().await {
1675                log::debug!("Tool finished {:?}", tool_result);
1676
1677                event_stream.update_tool_call_fields(
1678                    &tool_result.tool_use_id,
1679                    acp::ToolCallUpdateFields::new()
1680                        .status(if tool_result.is_error {
1681                            acp::ToolCallStatus::Failed
1682                        } else {
1683                            acp::ToolCallStatus::Completed
1684                        })
1685                        .raw_output(tool_result.output.clone()),
1686                );
1687                this.update(cx, |this, _cx| {
1688                    this.pending_message()
1689                        .tool_results
1690                        .insert(tool_result.tool_use_id.clone(), tool_result);
1691                })?;
1692            }
1693
1694            this.update(cx, |this, cx| {
1695                this.flush_pending_message(cx);
1696                if this.title.is_none() && this.pending_title_generation.is_none() {
1697                    this.generate_title(cx);
1698                }
1699            })?;
1700
1701            if cancelled {
1702                log::debug!("Turn cancelled by user, exiting");
1703                return Ok(());
1704            }
1705
1706            if let Some(error) = error {
1707                attempt += 1;
1708                let retry = this.update(cx, |this, cx| {
1709                    let user_store = this.user_store.read(cx);
1710                    this.handle_completion_error(error, attempt, user_store.plan())
1711                })??;
1712                let timer = cx.background_executor().timer(retry.duration);
1713                event_stream.send_retry(retry);
1714                timer.await;
1715                this.update(cx, |this, _cx| {
1716                    if let Some(Message::Agent(message)) = this.messages.last() {
1717                        if message.tool_results.is_empty() {
1718                            intent = CompletionIntent::UserPrompt;
1719                            this.messages.push(Message::Resume);
1720                        }
1721                    }
1722                })?;
1723            } else if end_turn {
1724                return Ok(());
1725            } else {
1726                let has_queued = this.update(cx, |this, _| this.has_queued_message())?;
1727                if has_queued {
1728                    log::debug!("Queued message found, ending turn at message boundary");
1729                    return Ok(());
1730                }
1731                intent = CompletionIntent::ToolResults;
1732                attempt = 0;
1733            }
1734        }
1735    }
1736
1737    fn handle_completion_error(
1738        &mut self,
1739        error: LanguageModelCompletionError,
1740        attempt: u8,
1741        plan: Option<Plan>,
1742    ) -> Result<acp_thread::RetryStatus> {
1743        let Some(model) = self.model.as_ref() else {
1744            return Err(anyhow!(error));
1745        };
1746
1747        let auto_retry = if model.provider_id() == ZED_CLOUD_PROVIDER_ID {
1748            match plan {
1749                Some(Plan::V2(_)) => true,
1750                None => false,
1751            }
1752        } else {
1753            true
1754        };
1755
1756        if !auto_retry {
1757            return Err(anyhow!(error));
1758        }
1759
1760        let Some(strategy) = Self::retry_strategy_for(&error) else {
1761            return Err(anyhow!(error));
1762        };
1763
1764        let max_attempts = match &strategy {
1765            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1766            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1767        };
1768
1769        if attempt > max_attempts {
1770            return Err(anyhow!(error));
1771        }
1772
1773        let delay = match &strategy {
1774            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1775                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1776                Duration::from_secs(delay_secs)
1777            }
1778            RetryStrategy::Fixed { delay, .. } => *delay,
1779        };
1780        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1781
1782        Ok(acp_thread::RetryStatus {
1783            last_error: error.to_string().into(),
1784            attempt: attempt as usize,
1785            max_attempts: max_attempts as usize,
1786            started_at: Instant::now(),
1787            duration: delay,
1788        })
1789    }
1790
1791    /// A helper method that's called on every streamed completion event.
1792    /// Returns an optional tool result task, which the main agentic loop will
1793    /// send back to the model when it resolves.
1794    fn handle_completion_event(
1795        &mut self,
1796        event: LanguageModelCompletionEvent,
1797        event_stream: &ThreadEventStream,
1798        cancellation_rx: watch::Receiver<bool>,
1799        cx: &mut Context<Self>,
1800    ) -> Result<Option<Task<LanguageModelToolResult>>> {
1801        log::trace!("Handling streamed completion event: {:?}", event);
1802        use LanguageModelCompletionEvent::*;
1803
1804        match event {
1805            StartMessage { .. } => {
1806                self.flush_pending_message(cx);
1807                self.pending_message = Some(AgentMessage::default());
1808            }
1809            Text(new_text) => self.handle_text_event(new_text, event_stream),
1810            Thinking { text, signature } => {
1811                self.handle_thinking_event(text, signature, event_stream)
1812            }
1813            RedactedThinking { data } => self.handle_redacted_thinking_event(data),
1814            ReasoningDetails(details) => {
1815                let last_message = self.pending_message();
1816                // Store the last non-empty reasoning_details (overwrites earlier ones)
1817                // This ensures we keep the encrypted reasoning with signatures, not the early text reasoning
1818                if let serde_json::Value::Array(ref arr) = details {
1819                    if !arr.is_empty() {
1820                        last_message.reasoning_details = Some(details);
1821                    }
1822                } else {
1823                    last_message.reasoning_details = Some(details);
1824                }
1825            }
1826            ToolUse(tool_use) => {
1827                return Ok(self.handle_tool_use_event(tool_use, event_stream, cancellation_rx, cx));
1828            }
1829            ToolUseJsonParseError {
1830                id,
1831                tool_name,
1832                raw_input,
1833                json_parse_error,
1834            } => {
1835                return Ok(Some(Task::ready(
1836                    self.handle_tool_use_json_parse_error_event(
1837                        id,
1838                        tool_name,
1839                        raw_input,
1840                        json_parse_error,
1841                    ),
1842                )));
1843            }
1844            UsageUpdate(usage) => {
1845                telemetry::event!(
1846                    "Agent Thread Completion Usage Updated",
1847                    thread_id = self.id.to_string(),
1848                    prompt_id = self.prompt_id.to_string(),
1849                    model = self.model.as_ref().map(|m| m.telemetry_id()),
1850                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
1851                    input_tokens = usage.input_tokens,
1852                    output_tokens = usage.output_tokens,
1853                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
1854                    cache_read_input_tokens = usage.cache_read_input_tokens,
1855                );
1856                self.update_token_usage(usage, cx);
1857            }
1858            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
1859            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
1860            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
1861            Started | Queued { .. } => {}
1862        }
1863
1864        Ok(None)
1865    }
1866
1867    fn handle_text_event(&mut self, new_text: String, event_stream: &ThreadEventStream) {
1868        event_stream.send_text(&new_text);
1869
1870        let last_message = self.pending_message();
1871        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1872            text.push_str(&new_text);
1873        } else {
1874            last_message
1875                .content
1876                .push(AgentMessageContent::Text(new_text));
1877        }
1878    }
1879
1880    fn handle_thinking_event(
1881        &mut self,
1882        new_text: String,
1883        new_signature: Option<String>,
1884        event_stream: &ThreadEventStream,
1885    ) {
1886        event_stream.send_thinking(&new_text);
1887
1888        let last_message = self.pending_message();
1889        if let Some(AgentMessageContent::Thinking { text, signature }) =
1890            last_message.content.last_mut()
1891        {
1892            text.push_str(&new_text);
1893            *signature = new_signature.or(signature.take());
1894        } else {
1895            last_message.content.push(AgentMessageContent::Thinking {
1896                text: new_text,
1897                signature: new_signature,
1898            });
1899        }
1900    }
1901
1902    fn handle_redacted_thinking_event(&mut self, data: String) {
1903        let last_message = self.pending_message();
1904        last_message
1905            .content
1906            .push(AgentMessageContent::RedactedThinking(data));
1907    }
1908
1909    fn handle_tool_use_event(
1910        &mut self,
1911        tool_use: LanguageModelToolUse,
1912        event_stream: &ThreadEventStream,
1913        cancellation_rx: watch::Receiver<bool>,
1914        cx: &mut Context<Self>,
1915    ) -> Option<Task<LanguageModelToolResult>> {
1916        cx.notify();
1917
1918        let tool = self.tool(tool_use.name.as_ref());
1919        let mut title = SharedString::from(&tool_use.name);
1920        let mut kind = acp::ToolKind::Other;
1921        if let Some(tool) = tool.as_ref() {
1922            title = tool.initial_title(tool_use.input.clone(), cx);
1923            kind = tool.kind();
1924        }
1925
1926        // Ensure the last message ends in the current tool use
1927        let last_message = self.pending_message();
1928        let push_new_tool_use = last_message.content.last_mut().is_none_or(|content| {
1929            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1930                if last_tool_use.id == tool_use.id {
1931                    *last_tool_use = tool_use.clone();
1932                    false
1933                } else {
1934                    true
1935                }
1936            } else {
1937                true
1938            }
1939        });
1940
1941        if push_new_tool_use {
1942            event_stream.send_tool_call(
1943                &tool_use.id,
1944                &tool_use.name,
1945                title,
1946                kind,
1947                tool_use.input.clone(),
1948            );
1949            last_message
1950                .content
1951                .push(AgentMessageContent::ToolUse(tool_use.clone()));
1952        } else {
1953            event_stream.update_tool_call_fields(
1954                &tool_use.id,
1955                acp::ToolCallUpdateFields::new()
1956                    .title(title.as_str())
1957                    .kind(kind)
1958                    .raw_input(tool_use.input.clone()),
1959            );
1960        }
1961
1962        if !tool_use.is_input_complete {
1963            return None;
1964        }
1965
1966        let Some(tool) = tool else {
1967            let content = format!("No tool named {} exists", tool_use.name);
1968            return Some(Task::ready(LanguageModelToolResult {
1969                content: LanguageModelToolResultContent::Text(Arc::from(content)),
1970                tool_use_id: tool_use.id,
1971                tool_name: tool_use.name,
1972                is_error: true,
1973                output: None,
1974            }));
1975        };
1976
1977        let fs = self.project.read(cx).fs().clone();
1978        let tool_event_stream = ToolCallEventStream::new(
1979            tool_use.id.clone(),
1980            event_stream.clone(),
1981            Some(fs),
1982            cancellation_rx,
1983        );
1984        tool_event_stream.update_fields(
1985            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress),
1986        );
1987        let supports_images = self.model().is_some_and(|model| model.supports_images());
1988        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
1989        log::debug!("Running tool {}", tool_use.name);
1990        Some(cx.foreground_executor().spawn(async move {
1991            let tool_result = tool_result.await.and_then(|output| {
1992                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
1993                    && !supports_images
1994                {
1995                    return Err(anyhow!(
1996                        "Attempted to read an image, but this model doesn't support it.",
1997                    ));
1998                }
1999                Ok(output)
2000            });
2001
2002            match tool_result {
2003                Ok(output) => LanguageModelToolResult {
2004                    tool_use_id: tool_use.id,
2005                    tool_name: tool_use.name,
2006                    is_error: false,
2007                    content: output.llm_output,
2008                    output: Some(output.raw_output),
2009                },
2010                Err(error) => LanguageModelToolResult {
2011                    tool_use_id: tool_use.id,
2012                    tool_name: tool_use.name,
2013                    is_error: true,
2014                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
2015                    output: Some(error.to_string().into()),
2016                },
2017            }
2018        }))
2019    }
2020
2021    fn handle_tool_use_json_parse_error_event(
2022        &mut self,
2023        tool_use_id: LanguageModelToolUseId,
2024        tool_name: Arc<str>,
2025        raw_input: Arc<str>,
2026        json_parse_error: String,
2027    ) -> LanguageModelToolResult {
2028        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
2029        LanguageModelToolResult {
2030            tool_use_id,
2031            tool_name,
2032            is_error: true,
2033            content: LanguageModelToolResultContent::Text(tool_output.into()),
2034            output: Some(serde_json::Value::String(raw_input.to_string())),
2035        }
2036    }
2037
2038    pub fn title(&self) -> SharedString {
2039        self.title.clone().unwrap_or("New Thread".into())
2040    }
2041
2042    pub fn is_generating_summary(&self) -> bool {
2043        self.pending_summary_generation.is_some()
2044    }
2045
2046    pub fn is_generating_title(&self) -> bool {
2047        self.pending_title_generation.is_some()
2048    }
2049
2050    pub fn summary(&mut self, cx: &mut Context<Self>) -> Shared<Task<Option<SharedString>>> {
2051        if let Some(summary) = self.summary.as_ref() {
2052            return Task::ready(Some(summary.clone())).shared();
2053        }
2054        if let Some(task) = self.pending_summary_generation.clone() {
2055            return task;
2056        }
2057        let Some(model) = self.summarization_model.clone() else {
2058            log::error!("No summarization model available");
2059            return Task::ready(None).shared();
2060        };
2061        let mut request = LanguageModelRequest {
2062            intent: Some(CompletionIntent::ThreadContextSummarization),
2063            temperature: AgentSettings::temperature_for_model(&model, cx),
2064            ..Default::default()
2065        };
2066
2067        for message in &self.messages {
2068            request.messages.extend(message.to_request());
2069        }
2070
2071        request.messages.push(LanguageModelRequestMessage {
2072            role: Role::User,
2073            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
2074            cache: false,
2075            reasoning_details: None,
2076        });
2077
2078        let task = cx
2079            .spawn(async move |this, cx| {
2080                let mut summary = String::new();
2081                let mut messages = model.stream_completion(request, cx).await.log_err()?;
2082                while let Some(event) = messages.next().await {
2083                    let event = event.log_err()?;
2084                    let text = match event {
2085                        LanguageModelCompletionEvent::Text(text) => text,
2086                        _ => continue,
2087                    };
2088
2089                    let mut lines = text.lines();
2090                    summary.extend(lines.next());
2091                }
2092
2093                log::debug!("Setting summary: {}", summary);
2094                let summary = SharedString::from(summary);
2095
2096                this.update(cx, |this, cx| {
2097                    this.summary = Some(summary.clone());
2098                    this.pending_summary_generation = None;
2099                    cx.notify()
2100                })
2101                .ok()?;
2102
2103                Some(summary)
2104            })
2105            .shared();
2106        self.pending_summary_generation = Some(task.clone());
2107        task
2108    }
2109
2110    pub fn generate_title(&mut self, cx: &mut Context<Self>) {
2111        let Some(model) = self.summarization_model.clone() else {
2112            return;
2113        };
2114
2115        log::debug!(
2116            "Generating title with model: {:?}",
2117            self.summarization_model.as_ref().map(|model| model.name())
2118        );
2119        let mut request = LanguageModelRequest {
2120            intent: Some(CompletionIntent::ThreadSummarization),
2121            temperature: AgentSettings::temperature_for_model(&model, cx),
2122            ..Default::default()
2123        };
2124
2125        for message in &self.messages {
2126            request.messages.extend(message.to_request());
2127        }
2128
2129        request.messages.push(LanguageModelRequestMessage {
2130            role: Role::User,
2131            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
2132            cache: false,
2133            reasoning_details: None,
2134        });
2135        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
2136            let mut title = String::new();
2137
2138            let generate = async {
2139                let mut messages = model.stream_completion(request, cx).await?;
2140                while let Some(event) = messages.next().await {
2141                    let event = event?;
2142                    let text = match event {
2143                        LanguageModelCompletionEvent::Text(text) => text,
2144                        _ => continue,
2145                    };
2146
2147                    let mut lines = text.lines();
2148                    title.extend(lines.next());
2149
2150                    // Stop if the LLM generated multiple lines.
2151                    if lines.next().is_some() {
2152                        break;
2153                    }
2154                }
2155                anyhow::Ok(())
2156            };
2157
2158            if generate.await.context("failed to generate title").is_ok() {
2159                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
2160            }
2161            _ = this.update(cx, |this, _| this.pending_title_generation = None);
2162        }));
2163    }
2164
2165    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
2166        self.pending_title_generation = None;
2167        if Some(&title) != self.title.as_ref() {
2168            self.title = Some(title);
2169            cx.emit(TitleUpdated);
2170            cx.notify();
2171        }
2172    }
2173
2174    fn clear_summary(&mut self) {
2175        self.summary = None;
2176        self.pending_summary_generation = None;
2177    }
2178
2179    fn last_user_message(&self) -> Option<&UserMessage> {
2180        self.messages
2181            .iter()
2182            .rev()
2183            .find_map(|message| match message {
2184                Message::User(user_message) => Some(user_message),
2185                Message::Agent(_) => None,
2186                Message::Resume => None,
2187            })
2188    }
2189
2190    fn pending_message(&mut self) -> &mut AgentMessage {
2191        self.pending_message.get_or_insert_default()
2192    }
2193
2194    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
2195        let Some(mut message) = self.pending_message.take() else {
2196            return;
2197        };
2198
2199        if message.content.is_empty() {
2200            return;
2201        }
2202
2203        for content in &message.content {
2204            let AgentMessageContent::ToolUse(tool_use) = content else {
2205                continue;
2206            };
2207
2208            if !message.tool_results.contains_key(&tool_use.id) {
2209                message.tool_results.insert(
2210                    tool_use.id.clone(),
2211                    LanguageModelToolResult {
2212                        tool_use_id: tool_use.id.clone(),
2213                        tool_name: tool_use.name.clone(),
2214                        is_error: true,
2215                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
2216                        output: None,
2217                    },
2218                );
2219            }
2220        }
2221
2222        self.messages.push(Message::Agent(message));
2223        self.updated_at = Utc::now();
2224        self.clear_summary();
2225        cx.notify()
2226    }
2227
2228    pub(crate) fn build_completion_request(
2229        &self,
2230        completion_intent: CompletionIntent,
2231        cx: &App,
2232    ) -> Result<LanguageModelRequest> {
2233        let model = self.model().context("No language model configured")?;
2234        let tools = if let Some(turn) = self.running_turn.as_ref() {
2235            turn.tools
2236                .iter()
2237                .filter_map(|(tool_name, tool)| {
2238                    log::trace!("Including tool: {}", tool_name);
2239                    Some(LanguageModelRequestTool {
2240                        name: tool_name.to_string(),
2241                        description: tool.description().to_string(),
2242                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
2243                    })
2244                })
2245                .collect::<Vec<_>>()
2246        } else {
2247            Vec::new()
2248        };
2249
2250        log::debug!("Building completion request");
2251        log::debug!("Completion intent: {:?}", completion_intent);
2252
2253        let available_tools: Vec<_> = self
2254            .running_turn
2255            .as_ref()
2256            .map(|turn| turn.tools.keys().cloned().collect())
2257            .unwrap_or_default();
2258
2259        log::debug!("Request includes {} tools", available_tools.len());
2260        let messages = self.build_request_messages(available_tools, cx);
2261        log::debug!("Request will include {} messages", messages.len());
2262
2263        let request = LanguageModelRequest {
2264            thread_id: Some(self.id.to_string()),
2265            prompt_id: Some(self.prompt_id.to_string()),
2266            intent: Some(completion_intent),
2267            messages,
2268            tools,
2269            tool_choice: None,
2270            stop: Vec::new(),
2271            temperature: AgentSettings::temperature_for_model(model, cx),
2272            thinking_allowed: self.thinking_enabled,
2273        };
2274
2275        log::debug!("Completion request built successfully");
2276        Ok(request)
2277    }
2278
2279    fn enabled_tools(
2280        &self,
2281        profile: &AgentProfileSettings,
2282        model: &Arc<dyn LanguageModel>,
2283        cx: &App,
2284    ) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
2285        fn truncate(tool_name: &SharedString) -> SharedString {
2286            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
2287                let mut truncated = tool_name.to_string();
2288                truncated.truncate(MAX_TOOL_NAME_LENGTH);
2289                truncated.into()
2290            } else {
2291                tool_name.clone()
2292            }
2293        }
2294
2295        let use_streaming_edit_tool =
2296            cx.has_flag::<AgentV2FeatureFlag>() && model.supports_streaming_tools();
2297
2298        let mut tools = self
2299            .tools
2300            .iter()
2301            .filter_map(|(tool_name, tool)| {
2302                // For streaming_edit_file, check profile against "edit_file" since that's what users configure
2303                let profile_tool_name = if tool_name == "streaming_edit_file" {
2304                    "edit_file"
2305                } else {
2306                    tool_name.as_ref()
2307                };
2308
2309                if tool.supports_provider(&model.provider_id())
2310                    && profile.is_tool_enabled(profile_tool_name)
2311                {
2312                    match (tool_name.as_ref(), use_streaming_edit_tool) {
2313                        ("streaming_edit_file", false) | ("edit_file", true) => None,
2314                        ("streaming_edit_file", true) => {
2315                            // Expose streaming tool as "edit_file"
2316                            Some((SharedString::from("edit_file"), tool.clone()))
2317                        }
2318                        _ => Some((truncate(tool_name), tool.clone())),
2319                    }
2320                } else {
2321                    None
2322                }
2323            })
2324            .collect::<BTreeMap<_, _>>();
2325
2326        let mut context_server_tools = Vec::new();
2327        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
2328        let mut duplicate_tool_names = HashSet::default();
2329        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
2330            for (tool_name, tool) in server_tools {
2331                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
2332                    let tool_name = truncate(tool_name);
2333                    if !seen_tools.insert(tool_name.clone()) {
2334                        duplicate_tool_names.insert(tool_name.clone());
2335                    }
2336                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
2337                }
2338            }
2339        }
2340
2341        // When there are duplicate tool names, disambiguate by prefixing them
2342        // with the server ID. In the rare case there isn't enough space for the
2343        // disambiguated tool name, keep only the last tool with this name.
2344        for (server_id, tool_name, tool) in context_server_tools {
2345            if duplicate_tool_names.contains(&tool_name) {
2346                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
2347                if available >= 2 {
2348                    let mut disambiguated = server_id.0.to_string();
2349                    disambiguated.truncate(available - 1);
2350                    disambiguated.push('_');
2351                    disambiguated.push_str(&tool_name);
2352                    tools.insert(disambiguated.into(), tool.clone());
2353                } else {
2354                    tools.insert(tool_name, tool.clone());
2355                }
2356            } else {
2357                tools.insert(tool_name, tool.clone());
2358            }
2359        }
2360
2361        tools
2362    }
2363
2364    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
2365        self.running_turn.as_ref()?.tools.get(name).cloned()
2366    }
2367
2368    pub fn has_tool(&self, name: &str) -> bool {
2369        self.running_turn
2370            .as_ref()
2371            .is_some_and(|turn| turn.tools.contains_key(name))
2372    }
2373
2374    #[cfg(any(test, feature = "test-support"))]
2375    pub fn has_registered_tool(&self, name: &str) -> bool {
2376        self.tools.contains_key(name)
2377    }
2378
2379    pub fn registered_tool_names(&self) -> Vec<SharedString> {
2380        self.tools.keys().cloned().collect()
2381    }
2382
2383    pub fn register_running_subagent(&mut self, subagent: WeakEntity<Thread>) {
2384        self.running_subagents.push(subagent);
2385    }
2386
2387    pub fn unregister_running_subagent(&mut self, subagent: &WeakEntity<Thread>) {
2388        self.running_subagents
2389            .retain(|s| s.entity_id() != subagent.entity_id());
2390    }
2391
2392    pub fn running_subagent_count(&self) -> usize {
2393        self.running_subagents
2394            .iter()
2395            .filter(|s| s.upgrade().is_some())
2396            .count()
2397    }
2398
2399    pub fn is_subagent(&self) -> bool {
2400        self.subagent_context.is_some()
2401    }
2402
2403    pub fn depth(&self) -> u8 {
2404        self.subagent_context.as_ref().map(|c| c.depth).unwrap_or(0)
2405    }
2406
2407    pub fn is_turn_complete(&self) -> bool {
2408        self.running_turn.is_none()
2409    }
2410
2411    pub fn submit_user_message(
2412        &mut self,
2413        content: impl Into<String>,
2414        cx: &mut Context<Self>,
2415    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
2416        let content = content.into();
2417        self.messages.push(Message::User(UserMessage {
2418            id: UserMessageId::new(),
2419            content: vec![UserMessageContent::Text(content)],
2420        }));
2421        cx.notify();
2422        self.send_existing(cx)
2423    }
2424
2425    pub fn interrupt_for_summary(
2426        &mut self,
2427        cx: &mut Context<Self>,
2428    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
2429        let context = self
2430            .subagent_context
2431            .as_ref()
2432            .context("Not a subagent thread")?;
2433        let prompt = context.context_low_prompt.clone();
2434        self.cancel(cx).detach();
2435        self.submit_user_message(prompt, cx)
2436    }
2437
2438    pub fn request_final_summary(
2439        &mut self,
2440        cx: &mut Context<Self>,
2441    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
2442        let context = self
2443            .subagent_context
2444            .as_ref()
2445            .context("Not a subagent thread")?;
2446        let prompt = context.summary_prompt.clone();
2447        self.submit_user_message(prompt, cx)
2448    }
2449
2450    fn build_request_messages(
2451        &self,
2452        available_tools: Vec<SharedString>,
2453        cx: &App,
2454    ) -> Vec<LanguageModelRequestMessage> {
2455        log::trace!(
2456            "Building request messages from {} thread messages",
2457            self.messages.len()
2458        );
2459
2460        let system_prompt = SystemPromptTemplate {
2461            project: self.project_context.read(cx),
2462            available_tools,
2463            model_name: self.model.as_ref().map(|m| m.name().0.to_string()),
2464        }
2465        .render(&self.templates)
2466        .context("failed to build system prompt")
2467        .expect("Invalid template");
2468        let mut messages = vec![LanguageModelRequestMessage {
2469            role: Role::System,
2470            content: vec![system_prompt.into()],
2471            cache: false,
2472            reasoning_details: None,
2473        }];
2474        for message in &self.messages {
2475            messages.extend(message.to_request());
2476        }
2477
2478        if let Some(last_message) = messages.last_mut() {
2479            last_message.cache = true;
2480        }
2481
2482        if let Some(message) = self.pending_message.as_ref() {
2483            messages.extend(message.to_request());
2484        }
2485
2486        messages
2487    }
2488
2489    pub fn to_markdown(&self) -> String {
2490        let mut markdown = String::new();
2491        for (ix, message) in self.messages.iter().enumerate() {
2492            if ix > 0 {
2493                markdown.push('\n');
2494            }
2495            markdown.push_str(&message.to_markdown());
2496        }
2497
2498        if let Some(message) = self.pending_message.as_ref() {
2499            markdown.push('\n');
2500            markdown.push_str(&message.to_markdown());
2501        }
2502
2503        markdown
2504    }
2505
2506    fn advance_prompt_id(&mut self) {
2507        self.prompt_id = PromptId::new();
2508    }
2509
2510    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
2511        use LanguageModelCompletionError::*;
2512        use http_client::StatusCode;
2513
2514        // General strategy here:
2515        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
2516        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
2517        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
2518        match error {
2519            HttpResponseError {
2520                status_code: StatusCode::TOO_MANY_REQUESTS,
2521                ..
2522            } => Some(RetryStrategy::ExponentialBackoff {
2523                initial_delay: BASE_RETRY_DELAY,
2524                max_attempts: MAX_RETRY_ATTEMPTS,
2525            }),
2526            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
2527                Some(RetryStrategy::Fixed {
2528                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2529                    max_attempts: MAX_RETRY_ATTEMPTS,
2530                })
2531            }
2532            UpstreamProviderError {
2533                status,
2534                retry_after,
2535                ..
2536            } => match *status {
2537                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
2538                    Some(RetryStrategy::Fixed {
2539                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2540                        max_attempts: MAX_RETRY_ATTEMPTS,
2541                    })
2542                }
2543                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
2544                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2545                    // Internal Server Error could be anything, retry up to 3 times.
2546                    max_attempts: 3,
2547                }),
2548                status => {
2549                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
2550                    // but we frequently get them in practice. See https://http.dev/529
2551                    if status.as_u16() == 529 {
2552                        Some(RetryStrategy::Fixed {
2553                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2554                            max_attempts: MAX_RETRY_ATTEMPTS,
2555                        })
2556                    } else {
2557                        Some(RetryStrategy::Fixed {
2558                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2559                            max_attempts: 2,
2560                        })
2561                    }
2562                }
2563            },
2564            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
2565                delay: BASE_RETRY_DELAY,
2566                max_attempts: 3,
2567            }),
2568            ApiReadResponseError { .. }
2569            | HttpSend { .. }
2570            | DeserializeResponse { .. }
2571            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
2572                delay: BASE_RETRY_DELAY,
2573                max_attempts: 3,
2574            }),
2575            // Retrying these errors definitely shouldn't help.
2576            HttpResponseError {
2577                status_code:
2578                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
2579                ..
2580            }
2581            | AuthenticationError { .. }
2582            | PermissionError { .. }
2583            | NoApiKey { .. }
2584            | ApiEndpointNotFound { .. }
2585            | PromptTooLarge { .. } => None,
2586            // These errors might be transient, so retry them
2587            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
2588                delay: BASE_RETRY_DELAY,
2589                max_attempts: 1,
2590            }),
2591            // Retry all other 4xx and 5xx errors once.
2592            HttpResponseError { status_code, .. }
2593                if status_code.is_client_error() || status_code.is_server_error() =>
2594            {
2595                Some(RetryStrategy::Fixed {
2596                    delay: BASE_RETRY_DELAY,
2597                    max_attempts: 3,
2598                })
2599            }
2600            Other(err) if err.is::<language_model::PaymentRequiredError>() => {
2601                // Retrying won't help for Payment Required errors.
2602                None
2603            }
2604            // Conservatively assume that any other errors are non-retryable
2605            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
2606                delay: BASE_RETRY_DELAY,
2607                max_attempts: 2,
2608            }),
2609        }
2610    }
2611}
2612
2613struct RunningTurn {
2614    /// Holds the task that handles agent interaction until the end of the turn.
2615    /// Survives across multiple requests as the model performs tool calls and
2616    /// we run tools, report their results.
2617    _task: Task<()>,
2618    /// The current event stream for the running turn. Used to report a final
2619    /// cancellation event if we cancel the turn.
2620    event_stream: ThreadEventStream,
2621    /// The tools that were enabled for this turn.
2622    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
2623    /// Sender to signal tool cancellation. When cancel is called, this is
2624    /// set to true so all tools can detect user-initiated cancellation.
2625    cancellation_tx: watch::Sender<bool>,
2626}
2627
2628impl RunningTurn {
2629    fn cancel(mut self) -> Task<()> {
2630        log::debug!("Cancelling in progress turn");
2631        self.cancellation_tx.send(true).ok();
2632        self.event_stream.send_canceled();
2633        self._task
2634    }
2635}
2636
2637pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
2638
2639impl EventEmitter<TokenUsageUpdated> for Thread {}
2640
2641pub struct TitleUpdated;
2642
2643impl EventEmitter<TitleUpdated> for Thread {}
2644
2645pub trait AgentTool
2646where
2647    Self: 'static + Sized,
2648{
2649    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
2650    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
2651
2652    fn name() -> &'static str;
2653
2654    fn description() -> SharedString {
2655        let schema = schemars::schema_for!(Self::Input);
2656        SharedString::new(
2657            schema
2658                .get("description")
2659                .and_then(|description| description.as_str())
2660                .unwrap_or_default(),
2661        )
2662    }
2663
2664    fn kind() -> acp::ToolKind;
2665
2666    /// The initial tool title to display. Can be updated during the tool run.
2667    fn initial_title(
2668        &self,
2669        input: Result<Self::Input, serde_json::Value>,
2670        cx: &mut App,
2671    ) -> SharedString;
2672
2673    /// Returns the JSON schema that describes the tool's input.
2674    fn input_schema(format: LanguageModelToolSchemaFormat) -> Schema {
2675        language_model::tool_schema::root_schema_for::<Self::Input>(format)
2676    }
2677
2678    /// Some tools rely on a provider for the underlying billing or other reasons.
2679    /// Allow the tool to check if they are compatible, or should be filtered out.
2680    fn supports_provider(_provider: &LanguageModelProviderId) -> bool {
2681        true
2682    }
2683
2684    /// Runs the tool with the provided input.
2685    fn run(
2686        self: Arc<Self>,
2687        input: Self::Input,
2688        event_stream: ToolCallEventStream,
2689        cx: &mut App,
2690    ) -> Task<Result<Self::Output>>;
2691
2692    /// Emits events for a previous execution of the tool.
2693    fn replay(
2694        &self,
2695        _input: Self::Input,
2696        _output: Self::Output,
2697        _event_stream: ToolCallEventStream,
2698        _cx: &mut App,
2699    ) -> Result<()> {
2700        Ok(())
2701    }
2702
2703    fn erase(self) -> Arc<dyn AnyAgentTool> {
2704        Arc::new(Erased(Arc::new(self)))
2705    }
2706
2707    /// Create a new instance of this tool bound to a different thread.
2708    /// This is used when creating subagents, so that tools like EditFileTool
2709    /// that hold a thread reference will use the subagent's thread instead
2710    /// of the parent's thread.
2711    /// Returns None if the tool doesn't need rebinding (most tools).
2712    fn rebind_thread(&self, _new_thread: WeakEntity<Thread>) -> Option<Arc<dyn AnyAgentTool>> {
2713        None
2714    }
2715}
2716
2717pub struct Erased<T>(T);
2718
2719pub struct AgentToolOutput {
2720    pub llm_output: LanguageModelToolResultContent,
2721    pub raw_output: serde_json::Value,
2722}
2723
2724pub trait AnyAgentTool {
2725    fn name(&self) -> SharedString;
2726    fn description(&self) -> SharedString;
2727    fn kind(&self) -> acp::ToolKind;
2728    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
2729    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
2730    fn supports_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2731        true
2732    }
2733    fn run(
2734        self: Arc<Self>,
2735        input: serde_json::Value,
2736        event_stream: ToolCallEventStream,
2737        cx: &mut App,
2738    ) -> Task<Result<AgentToolOutput>>;
2739    fn replay(
2740        &self,
2741        input: serde_json::Value,
2742        output: serde_json::Value,
2743        event_stream: ToolCallEventStream,
2744        cx: &mut App,
2745    ) -> Result<()>;
2746    /// Create a new instance of this tool bound to a different thread.
2747    /// This is used when creating subagents, so that tools like EditFileTool
2748    /// that hold a thread reference will use the subagent's thread instead
2749    /// of the parent's thread.
2750    /// Returns None if the tool doesn't need rebinding (most tools).
2751    fn rebind_thread(&self, _new_thread: WeakEntity<Thread>) -> Option<Arc<dyn AnyAgentTool>> {
2752        None
2753    }
2754}
2755
2756impl<T> AnyAgentTool for Erased<Arc<T>>
2757where
2758    T: AgentTool,
2759{
2760    fn name(&self) -> SharedString {
2761        T::name().into()
2762    }
2763
2764    fn description(&self) -> SharedString {
2765        T::description()
2766    }
2767
2768    fn kind(&self) -> agent_client_protocol::ToolKind {
2769        T::kind()
2770    }
2771
2772    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
2773        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
2774        self.0.initial_title(parsed_input, _cx)
2775    }
2776
2777    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
2778        let mut json = serde_json::to_value(T::input_schema(format))?;
2779        language_model::tool_schema::adapt_schema_to_format(&mut json, format)?;
2780        Ok(json)
2781    }
2782
2783    fn supports_provider(&self, provider: &LanguageModelProviderId) -> bool {
2784        T::supports_provider(provider)
2785    }
2786
2787    fn run(
2788        self: Arc<Self>,
2789        input: serde_json::Value,
2790        event_stream: ToolCallEventStream,
2791        cx: &mut App,
2792    ) -> Task<Result<AgentToolOutput>> {
2793        cx.spawn(async move |cx| {
2794            let input = serde_json::from_value(input)?;
2795            let output = cx
2796                .update(|cx| self.0.clone().run(input, event_stream, cx))
2797                .await?;
2798            let raw_output = serde_json::to_value(&output)?;
2799            Ok(AgentToolOutput {
2800                llm_output: output.into(),
2801                raw_output,
2802            })
2803        })
2804    }
2805
2806    fn replay(
2807        &self,
2808        input: serde_json::Value,
2809        output: serde_json::Value,
2810        event_stream: ToolCallEventStream,
2811        cx: &mut App,
2812    ) -> Result<()> {
2813        let input = serde_json::from_value(input)?;
2814        let output = serde_json::from_value(output)?;
2815        self.0.replay(input, output, event_stream, cx)
2816    }
2817
2818    fn rebind_thread(&self, new_thread: WeakEntity<Thread>) -> Option<Arc<dyn AnyAgentTool>> {
2819        self.0.rebind_thread(new_thread)
2820    }
2821}
2822
2823#[derive(Clone)]
2824struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
2825
2826impl ThreadEventStream {
2827    fn send_user_message(&self, message: &UserMessage) {
2828        self.0
2829            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
2830            .ok();
2831    }
2832
2833    fn send_text(&self, text: &str) {
2834        self.0
2835            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
2836            .ok();
2837    }
2838
2839    fn send_thinking(&self, text: &str) {
2840        self.0
2841            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
2842            .ok();
2843    }
2844
2845    fn send_tool_call(
2846        &self,
2847        id: &LanguageModelToolUseId,
2848        tool_name: &str,
2849        title: SharedString,
2850        kind: acp::ToolKind,
2851        input: serde_json::Value,
2852    ) {
2853        self.0
2854            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
2855                id,
2856                tool_name,
2857                title.to_string(),
2858                kind,
2859                input,
2860            ))))
2861            .ok();
2862    }
2863
2864    fn initial_tool_call(
2865        id: &LanguageModelToolUseId,
2866        tool_name: &str,
2867        title: String,
2868        kind: acp::ToolKind,
2869        input: serde_json::Value,
2870    ) -> acp::ToolCall {
2871        acp::ToolCall::new(id.to_string(), title)
2872            .kind(kind)
2873            .raw_input(input)
2874            .meta(acp_thread::meta_with_tool_name(tool_name))
2875    }
2876
2877    fn update_tool_call_fields(
2878        &self,
2879        tool_use_id: &LanguageModelToolUseId,
2880        fields: acp::ToolCallUpdateFields,
2881    ) {
2882        self.0
2883            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2884                acp::ToolCallUpdate::new(tool_use_id.to_string(), fields).into(),
2885            )))
2886            .ok();
2887    }
2888
2889    fn send_retry(&self, status: acp_thread::RetryStatus) {
2890        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
2891    }
2892
2893    fn send_stop(&self, reason: acp::StopReason) {
2894        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
2895    }
2896
2897    fn send_canceled(&self) {
2898        self.0
2899            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2900            .ok();
2901    }
2902
2903    fn send_error(&self, error: impl Into<anyhow::Error>) {
2904        self.0.unbounded_send(Err(error.into())).ok();
2905    }
2906}
2907
2908#[derive(Clone)]
2909pub struct ToolCallEventStream {
2910    tool_use_id: LanguageModelToolUseId,
2911    stream: ThreadEventStream,
2912    fs: Option<Arc<dyn Fs>>,
2913    cancellation_rx: watch::Receiver<bool>,
2914}
2915
2916impl ToolCallEventStream {
2917    #[cfg(any(test, feature = "test-support"))]
2918    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
2919        let (stream, receiver, _cancellation_tx) = Self::test_with_cancellation();
2920        (stream, receiver)
2921    }
2922
2923    #[cfg(any(test, feature = "test-support"))]
2924    pub fn test_with_cancellation() -> (Self, ToolCallEventStreamReceiver, watch::Sender<bool>) {
2925        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
2926        let (cancellation_tx, cancellation_rx) = watch::channel(false);
2927
2928        let stream = ToolCallEventStream::new(
2929            "test_id".into(),
2930            ThreadEventStream(events_tx),
2931            None,
2932            cancellation_rx,
2933        );
2934
2935        (
2936            stream,
2937            ToolCallEventStreamReceiver(events_rx),
2938            cancellation_tx,
2939        )
2940    }
2941
2942    /// Signal cancellation for this event stream. Only available in tests.
2943    #[cfg(any(test, feature = "test-support"))]
2944    pub fn signal_cancellation_with_sender(cancellation_tx: &mut watch::Sender<bool>) {
2945        cancellation_tx.send(true).ok();
2946    }
2947
2948    fn new(
2949        tool_use_id: LanguageModelToolUseId,
2950        stream: ThreadEventStream,
2951        fs: Option<Arc<dyn Fs>>,
2952        cancellation_rx: watch::Receiver<bool>,
2953    ) -> Self {
2954        Self {
2955            tool_use_id,
2956            stream,
2957            fs,
2958            cancellation_rx,
2959        }
2960    }
2961
2962    /// Returns a future that resolves when the user cancels the tool call.
2963    /// Tools should select on this alongside their main work to detect user cancellation.
2964    pub fn cancelled_by_user(&self) -> impl std::future::Future<Output = ()> + '_ {
2965        let mut rx = self.cancellation_rx.clone();
2966        async move {
2967            loop {
2968                if *rx.borrow() {
2969                    return;
2970                }
2971                if rx.changed().await.is_err() {
2972                    // Sender dropped, will never be cancelled
2973                    std::future::pending::<()>().await;
2974                }
2975            }
2976        }
2977    }
2978
2979    /// Returns true if the user has cancelled this tool call.
2980    /// This is useful for checking cancellation state after an operation completes,
2981    /// to determine if the completion was due to user cancellation.
2982    pub fn was_cancelled_by_user(&self) -> bool {
2983        *self.cancellation_rx.clone().borrow()
2984    }
2985
2986    pub fn tool_use_id(&self) -> &LanguageModelToolUseId {
2987        &self.tool_use_id
2988    }
2989
2990    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
2991        self.stream
2992            .update_tool_call_fields(&self.tool_use_id, fields);
2993    }
2994
2995    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
2996        self.stream
2997            .0
2998            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2999                acp_thread::ToolCallUpdateDiff {
3000                    id: acp::ToolCallId::new(self.tool_use_id.to_string()),
3001                    diff,
3002                }
3003                .into(),
3004            )))
3005            .ok();
3006    }
3007
3008    pub fn update_subagent_thread(&self, thread: Entity<acp_thread::AcpThread>) {
3009        self.stream
3010            .0
3011            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
3012                acp_thread::ToolCallUpdateSubagentThread {
3013                    id: acp::ToolCallId::new(self.tool_use_id.to_string()),
3014                    thread,
3015                }
3016                .into(),
3017            )))
3018            .ok();
3019    }
3020
3021    /// Authorize a third-party tool (e.g., MCP tool from a context server).
3022    ///
3023    /// Unlike built-in tools, third-party tools don't support pattern-based permissions.
3024    /// They only support `default_mode` (allow/deny/confirm) per tool.
3025    ///
3026    /// Uses the dropdown authorization flow with two granularities:
3027    /// - "Always for <display_name> MCP tool" → sets `tools.<tool_id>.default_mode = "allow"` or "deny"
3028    /// - "Only this time" → allow/deny once
3029    pub fn authorize_third_party_tool(
3030        &self,
3031        title: impl Into<String>,
3032        tool_id: String,
3033        display_name: String,
3034        cx: &mut App,
3035    ) -> Task<Result<()>> {
3036        let settings = agent_settings::AgentSettings::get_global(cx);
3037
3038        let decision = decide_permission_from_settings(&tool_id, "", &settings);
3039
3040        match decision {
3041            ToolPermissionDecision::Allow => return Task::ready(Ok(())),
3042            ToolPermissionDecision::Deny(reason) => return Task::ready(Err(anyhow!(reason))),
3043            ToolPermissionDecision::Confirm => {}
3044        }
3045
3046        let (response_tx, response_rx) = oneshot::channel();
3047        self.stream
3048            .0
3049            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
3050                ToolCallAuthorization {
3051                    tool_call: acp::ToolCallUpdate::new(
3052                        self.tool_use_id.to_string(),
3053                        acp::ToolCallUpdateFields::new().title(title.into()),
3054                    ),
3055                    options: acp_thread::PermissionOptions::Dropdown(vec![
3056                        acp_thread::PermissionOptionChoice {
3057                            allow: acp::PermissionOption::new(
3058                                acp::PermissionOptionId::new(format!(
3059                                    "always_allow_mcp:{}",
3060                                    tool_id
3061                                )),
3062                                format!("Always for {} MCP tool", display_name),
3063                                acp::PermissionOptionKind::AllowAlways,
3064                            ),
3065                            deny: acp::PermissionOption::new(
3066                                acp::PermissionOptionId::new(format!(
3067                                    "always_deny_mcp:{}",
3068                                    tool_id
3069                                )),
3070                                format!("Always for {} MCP tool", display_name),
3071                                acp::PermissionOptionKind::RejectAlways,
3072                            ),
3073                        },
3074                        acp_thread::PermissionOptionChoice {
3075                            allow: acp::PermissionOption::new(
3076                                acp::PermissionOptionId::new("allow"),
3077                                "Only this time",
3078                                acp::PermissionOptionKind::AllowOnce,
3079                            ),
3080                            deny: acp::PermissionOption::new(
3081                                acp::PermissionOptionId::new("deny"),
3082                                "Only this time",
3083                                acp::PermissionOptionKind::RejectOnce,
3084                            ),
3085                        },
3086                    ]),
3087                    response: response_tx,
3088                    context: None,
3089                },
3090            )))
3091            .ok();
3092
3093        let fs = self.fs.clone();
3094        cx.spawn(async move |cx| {
3095            let response_str = response_rx.await?.0.to_string();
3096
3097            if response_str == format!("always_allow_mcp:{}", tool_id) {
3098                if let Some(fs) = fs.clone() {
3099                    cx.update(|cx| {
3100                        update_settings_file(fs, cx, move |settings, _| {
3101                            settings
3102                                .agent
3103                                .get_or_insert_default()
3104                                .set_tool_default_mode(&tool_id, ToolPermissionMode::Allow);
3105                        });
3106                    });
3107                }
3108                return Ok(());
3109            }
3110            if response_str == format!("always_deny_mcp:{}", tool_id) {
3111                if let Some(fs) = fs.clone() {
3112                    cx.update(|cx| {
3113                        update_settings_file(fs, cx, move |settings, _| {
3114                            settings
3115                                .agent
3116                                .get_or_insert_default()
3117                                .set_tool_default_mode(&tool_id, ToolPermissionMode::Deny);
3118                        });
3119                    });
3120                }
3121                return Err(anyhow!("Permission to run tool denied by user"));
3122            }
3123
3124            if response_str == "allow" {
3125                return Ok(());
3126            }
3127
3128            Err(anyhow!("Permission to run tool denied by user"))
3129        })
3130    }
3131
3132    pub fn authorize(
3133        &self,
3134        title: impl Into<String>,
3135        context: ToolPermissionContext,
3136        cx: &mut App,
3137    ) -> Task<Result<()>> {
3138        use settings::ToolPermissionMode;
3139
3140        let options = context.build_permission_options();
3141
3142        let (response_tx, response_rx) = oneshot::channel();
3143        self.stream
3144            .0
3145            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
3146                ToolCallAuthorization {
3147                    tool_call: acp::ToolCallUpdate::new(
3148                        self.tool_use_id.to_string(),
3149                        acp::ToolCallUpdateFields::new().title(title.into()),
3150                    ),
3151                    options,
3152                    response: response_tx,
3153                    context: Some(context),
3154                },
3155            )))
3156            .ok();
3157
3158        let fs = self.fs.clone();
3159        cx.spawn(async move |cx| {
3160            let response_str = response_rx.await?.0.to_string();
3161
3162            // Handle "always allow tool" - e.g., "always_allow:terminal"
3163            if let Some(tool) = response_str.strip_prefix("always_allow:") {
3164                if let Some(fs) = fs.clone() {
3165                    let tool = tool.to_string();
3166                    cx.update(|cx| {
3167                        update_settings_file(fs, cx, move |settings, _| {
3168                            settings
3169                                .agent
3170                                .get_or_insert_default()
3171                                .set_tool_default_mode(&tool, ToolPermissionMode::Allow);
3172                        });
3173                    });
3174                }
3175                return Ok(());
3176            }
3177
3178            // Handle "always deny tool" - e.g., "always_deny:terminal"
3179            if let Some(tool) = response_str.strip_prefix("always_deny:") {
3180                if let Some(fs) = fs.clone() {
3181                    let tool = tool.to_string();
3182                    cx.update(|cx| {
3183                        update_settings_file(fs, cx, move |settings, _| {
3184                            settings
3185                                .agent
3186                                .get_or_insert_default()
3187                                .set_tool_default_mode(&tool, ToolPermissionMode::Deny);
3188                        });
3189                    });
3190                }
3191                return Err(anyhow!("Permission to run tool denied by user"));
3192            }
3193
3194            // Handle "always allow pattern" - e.g., "always_allow_pattern:terminal:^cargo\s"
3195            if response_str.starts_with("always_allow_pattern:") {
3196                let parts: Vec<&str> = response_str.splitn(3, ':').collect();
3197                if parts.len() == 3 {
3198                    let pattern_tool_name = parts[1].to_string();
3199                    let pattern = parts[2].to_string();
3200                    if let Some(fs) = fs.clone() {
3201                        cx.update(|cx| {
3202                            update_settings_file(fs, cx, move |settings, _| {
3203                                settings
3204                                    .agent
3205                                    .get_or_insert_default()
3206                                    .add_tool_allow_pattern(&pattern_tool_name, pattern);
3207                            });
3208                        });
3209                    }
3210                }
3211                return Ok(());
3212            }
3213
3214            // Handle "always deny pattern" - e.g., "always_deny_pattern:terminal:^cargo\s"
3215            if response_str.starts_with("always_deny_pattern:") {
3216                let parts: Vec<&str> = response_str.splitn(3, ':').collect();
3217                if parts.len() == 3 {
3218                    let pattern_tool_name = parts[1].to_string();
3219                    let pattern = parts[2].to_string();
3220                    if let Some(fs) = fs.clone() {
3221                        cx.update(|cx| {
3222                            update_settings_file(fs, cx, move |settings, _| {
3223                                settings
3224                                    .agent
3225                                    .get_or_insert_default()
3226                                    .add_tool_deny_pattern(&pattern_tool_name, pattern);
3227                            });
3228                        });
3229                    }
3230                }
3231                return Err(anyhow!("Permission to run tool denied by user"));
3232            }
3233
3234            // Handle simple "allow" (allow once)
3235            if response_str == "allow" {
3236                return Ok(());
3237            }
3238
3239            // Handle simple "deny" (deny once)
3240            Err(anyhow!("Permission to run tool denied by user"))
3241        })
3242    }
3243}
3244
3245#[cfg(any(test, feature = "test-support"))]
3246pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
3247
3248#[cfg(any(test, feature = "test-support"))]
3249impl ToolCallEventStreamReceiver {
3250    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
3251        let event = self.0.next().await;
3252        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
3253            auth
3254        } else {
3255            panic!("Expected ToolCallAuthorization but got: {:?}", event);
3256        }
3257    }
3258
3259    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
3260        let event = self.0.next().await;
3261        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
3262            update,
3263        )))) = event
3264        {
3265            update.fields
3266        } else {
3267            panic!("Expected update fields but got: {:?}", event);
3268        }
3269    }
3270
3271    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
3272        let event = self.0.next().await;
3273        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
3274            update,
3275        )))) = event
3276        {
3277            update.diff
3278        } else {
3279            panic!("Expected diff but got: {:?}", event);
3280        }
3281    }
3282
3283    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
3284        let event = self.0.next().await;
3285        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
3286            update,
3287        )))) = event
3288        {
3289            update.terminal
3290        } else {
3291            panic!("Expected terminal but got: {:?}", event);
3292        }
3293    }
3294}
3295
3296#[cfg(any(test, feature = "test-support"))]
3297impl std::ops::Deref for ToolCallEventStreamReceiver {
3298    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
3299
3300    fn deref(&self) -> &Self::Target {
3301        &self.0
3302    }
3303}
3304
3305#[cfg(any(test, feature = "test-support"))]
3306impl std::ops::DerefMut for ToolCallEventStreamReceiver {
3307    fn deref_mut(&mut self) -> &mut Self::Target {
3308        &mut self.0
3309    }
3310}
3311
3312impl From<&str> for UserMessageContent {
3313    fn from(text: &str) -> Self {
3314        Self::Text(text.into())
3315    }
3316}
3317
3318impl UserMessageContent {
3319    pub fn from_content_block(value: acp::ContentBlock, path_style: PathStyle) -> Self {
3320        match value {
3321            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
3322            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
3323            acp::ContentBlock::Audio(_) => {
3324                // TODO
3325                Self::Text("[audio]".to_string())
3326            }
3327            acp::ContentBlock::ResourceLink(resource_link) => {
3328                match MentionUri::parse(&resource_link.uri, path_style) {
3329                    Ok(uri) => Self::Mention {
3330                        uri,
3331                        content: String::new(),
3332                    },
3333                    Err(err) => {
3334                        log::error!("Failed to parse mention link: {}", err);
3335                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
3336                    }
3337                }
3338            }
3339            acp::ContentBlock::Resource(resource) => match resource.resource {
3340                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
3341                    match MentionUri::parse(&resource.uri, path_style) {
3342                        Ok(uri) => Self::Mention {
3343                            uri,
3344                            content: resource.text,
3345                        },
3346                        Err(err) => {
3347                            log::error!("Failed to parse mention link: {}", err);
3348                            Self::Text(
3349                                MarkdownCodeBlock {
3350                                    tag: &resource.uri,
3351                                    text: &resource.text,
3352                                }
3353                                .to_string(),
3354                            )
3355                        }
3356                    }
3357                }
3358                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
3359                    // TODO
3360                    Self::Text("[blob]".to_string())
3361                }
3362                other => {
3363                    log::warn!("Unexpected content type: {:?}", other);
3364                    Self::Text("[unknown]".to_string())
3365                }
3366            },
3367            other => {
3368                log::warn!("Unexpected content type: {:?}", other);
3369                Self::Text("[unknown]".to_string())
3370            }
3371        }
3372    }
3373}
3374
3375impl From<UserMessageContent> for acp::ContentBlock {
3376    fn from(content: UserMessageContent) -> Self {
3377        match content {
3378            UserMessageContent::Text(text) => text.into(),
3379            UserMessageContent::Image(image) => {
3380                acp::ContentBlock::Image(acp::ImageContent::new(image.source, "image/png"))
3381            }
3382            UserMessageContent::Mention { uri, content } => acp::ContentBlock::Resource(
3383                acp::EmbeddedResource::new(acp::EmbeddedResourceResource::TextResourceContents(
3384                    acp::TextResourceContents::new(content, uri.to_uri().to_string()),
3385                )),
3386            ),
3387        }
3388    }
3389}
3390
3391fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
3392    LanguageModelImage {
3393        source: image_content.data.into(),
3394        size: None,
3395    }
3396}