thread.rs

   1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ProjectSnapshot, ReadFileTool,
   5    RestoreFileFromDiskTool, SaveFileTool, StreamingEditFileTool, SubagentTool,
   6    SystemPromptTemplate, Template, Templates, TerminalTool, ThinkingTool, ToolPermissionDecision,
   7    WebSearchTool, decide_permission_from_settings,
   8};
   9use acp_thread::{MentionUri, UserMessageId};
  10use action_log::ActionLog;
  11use feature_flags::{FeatureFlagAppExt as _, SubagentsFeatureFlag};
  12
  13use agent_client_protocol as acp;
  14use agent_settings::{
  15    AgentProfileId, AgentProfileSettings, AgentSettings, SUMMARIZE_THREAD_DETAILED_PROMPT,
  16    SUMMARIZE_THREAD_PROMPT,
  17};
  18use anyhow::{Context as _, Result, anyhow};
  19use chrono::{DateTime, Utc};
  20use client::UserStore;
  21use cloud_api_types::Plan;
  22use cloud_llm_client::CompletionIntent;
  23use collections::{HashMap, HashSet, IndexMap};
  24use fs::Fs;
  25use futures::stream;
  26use futures::{
  27    FutureExt,
  28    channel::{mpsc, oneshot},
  29    future::Shared,
  30    stream::FuturesUnordered,
  31};
  32use gpui::{
  33    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  34};
  35use language_model::{
  36    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  37    LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
  38    LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
  39    LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse,
  40    LanguageModelToolUseId, Role, SelectedModel, StopReason, TokenUsage, ZED_CLOUD_PROVIDER_ID,
  41};
  42use project::Project;
  43use prompt_store::ProjectContext;
  44use schemars::{JsonSchema, Schema};
  45use serde::{Deserialize, Serialize};
  46use settings::{LanguageModelSelection, Settings, ToolPermissionMode, update_settings_file};
  47use smol::stream::StreamExt;
  48use std::{
  49    collections::BTreeMap,
  50    ops::RangeInclusive,
  51    path::Path,
  52    rc::Rc,
  53    sync::Arc,
  54    time::{Duration, Instant},
  55};
  56use std::{fmt::Write, path::PathBuf};
  57use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock, paths::PathStyle};
  58use uuid::Uuid;
  59
  60const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  61pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  62pub const MAX_SUBAGENT_DEPTH: u8 = 4;
  63pub const MAX_PARALLEL_SUBAGENTS: usize = 8;
  64
  65/// Context passed to a subagent thread for lifecycle management
  66#[derive(Clone)]
  67pub struct SubagentContext {
  68    /// ID of the parent thread
  69    pub parent_thread_id: acp::SessionId,
  70
  71    /// ID of the tool call that spawned this subagent
  72    pub tool_use_id: LanguageModelToolUseId,
  73
  74    /// Current depth level (0 = root agent, 1 = first-level subagent, etc.)
  75    pub depth: u8,
  76
  77    /// Prompt to send when subagent completes successfully
  78    pub summary_prompt: String,
  79
  80    /// Prompt to send when context is running low (≤25% remaining)
  81    pub context_low_prompt: String,
  82}
  83
  84/// The ID of the user prompt that initiated a request.
  85///
  86/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  87#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  88pub struct PromptId(Arc<str>);
  89
  90impl PromptId {
  91    pub fn new() -> Self {
  92        Self(Uuid::new_v4().to_string().into())
  93    }
  94}
  95
  96impl std::fmt::Display for PromptId {
  97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  98        write!(f, "{}", self.0)
  99    }
 100}
 101
 102pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
 103pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
 104
 105#[derive(Debug, Clone)]
 106enum RetryStrategy {
 107    ExponentialBackoff {
 108        initial_delay: Duration,
 109        max_attempts: u8,
 110    },
 111    Fixed {
 112        delay: Duration,
 113        max_attempts: u8,
 114    },
 115}
 116
 117#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 118pub enum Message {
 119    User(UserMessage),
 120    Agent(AgentMessage),
 121    Resume,
 122}
 123
 124impl Message {
 125    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 126        match self {
 127            Message::Agent(agent_message) => Some(agent_message),
 128            _ => None,
 129        }
 130    }
 131
 132    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 133        match self {
 134            Message::User(message) => {
 135                if message.content.is_empty() {
 136                    vec![]
 137                } else {
 138                    vec![message.to_request()]
 139                }
 140            }
 141            Message::Agent(message) => message.to_request(),
 142            Message::Resume => vec![LanguageModelRequestMessage {
 143                role: Role::User,
 144                content: vec!["Continue where you left off".into()],
 145                cache: false,
 146                reasoning_details: None,
 147            }],
 148        }
 149    }
 150
 151    pub fn to_markdown(&self) -> String {
 152        match self {
 153            Message::User(message) => message.to_markdown(),
 154            Message::Agent(message) => message.to_markdown(),
 155            Message::Resume => "[resume]\n".into(),
 156        }
 157    }
 158
 159    pub fn role(&self) -> Role {
 160        match self {
 161            Message::User(_) | Message::Resume => Role::User,
 162            Message::Agent(_) => Role::Assistant,
 163        }
 164    }
 165}
 166
 167#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 168pub struct UserMessage {
 169    pub id: UserMessageId,
 170    pub content: Vec<UserMessageContent>,
 171}
 172
 173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 174pub enum UserMessageContent {
 175    Text(String),
 176    Mention { uri: MentionUri, content: String },
 177    Image(LanguageModelImage),
 178}
 179
 180impl UserMessage {
 181    pub fn to_markdown(&self) -> String {
 182        let mut markdown = String::from("## User\n\n");
 183
 184        for content in &self.content {
 185            match content {
 186                UserMessageContent::Text(text) => {
 187                    markdown.push_str(text);
 188                    markdown.push('\n');
 189                }
 190                UserMessageContent::Image(_) => {
 191                    markdown.push_str("<image />\n");
 192                }
 193                UserMessageContent::Mention { uri, content } => {
 194                    if !content.is_empty() {
 195                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 196                    } else {
 197                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 198                    }
 199                }
 200            }
 201        }
 202
 203        markdown
 204    }
 205
 206    fn to_request(&self) -> LanguageModelRequestMessage {
 207        let mut message = LanguageModelRequestMessage {
 208            role: Role::User,
 209            content: Vec::with_capacity(self.content.len()),
 210            cache: false,
 211            reasoning_details: None,
 212        };
 213
 214        const OPEN_CONTEXT: &str = "<context>\n\
 215            The following items were attached by the user. \
 216            They are up-to-date and don't need to be re-read.\n\n";
 217
 218        const OPEN_FILES_TAG: &str = "<files>";
 219        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 220        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 221        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 222        const OPEN_THREADS_TAG: &str = "<threads>";
 223        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 224        const OPEN_RULES_TAG: &str =
 225            "<rules>\nThe user has specified the following rules that should be applied:\n";
 226        const OPEN_DIAGNOSTICS_TAG: &str = "<diagnostics>";
 227
 228        let mut file_context = OPEN_FILES_TAG.to_string();
 229        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 230        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 231        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 232        let mut thread_context = OPEN_THREADS_TAG.to_string();
 233        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 234        let mut rules_context = OPEN_RULES_TAG.to_string();
 235        let mut diagnostics_context = OPEN_DIAGNOSTICS_TAG.to_string();
 236
 237        for chunk in &self.content {
 238            let chunk = match chunk {
 239                UserMessageContent::Text(text) => {
 240                    language_model::MessageContent::Text(text.clone())
 241                }
 242                UserMessageContent::Image(value) => {
 243                    language_model::MessageContent::Image(value.clone())
 244                }
 245                UserMessageContent::Mention { uri, content } => {
 246                    match uri {
 247                        MentionUri::File { abs_path } => {
 248                            write!(
 249                                &mut file_context,
 250                                "\n{}",
 251                                MarkdownCodeBlock {
 252                                    tag: &codeblock_tag(abs_path, None),
 253                                    text: &content.to_string(),
 254                                }
 255                            )
 256                            .ok();
 257                        }
 258                        MentionUri::PastedImage => {
 259                            debug_panic!("pasted image URI should not be used in mention content")
 260                        }
 261                        MentionUri::Directory { .. } => {
 262                            write!(&mut directory_context, "\n{}\n", content).ok();
 263                        }
 264                        MentionUri::Symbol {
 265                            abs_path: path,
 266                            line_range,
 267                            ..
 268                        } => {
 269                            write!(
 270                                &mut symbol_context,
 271                                "\n{}",
 272                                MarkdownCodeBlock {
 273                                    tag: &codeblock_tag(path, Some(line_range)),
 274                                    text: content
 275                                }
 276                            )
 277                            .ok();
 278                        }
 279                        MentionUri::Selection {
 280                            abs_path: path,
 281                            line_range,
 282                            ..
 283                        } => {
 284                            write!(
 285                                &mut selection_context,
 286                                "\n{}",
 287                                MarkdownCodeBlock {
 288                                    tag: &codeblock_tag(
 289                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 290                                        Some(line_range)
 291                                    ),
 292                                    text: content
 293                                }
 294                            )
 295                            .ok();
 296                        }
 297                        MentionUri::Thread { .. } => {
 298                            write!(&mut thread_context, "\n{}\n", content).ok();
 299                        }
 300                        MentionUri::TextThread { .. } => {
 301                            write!(&mut thread_context, "\n{}\n", content).ok();
 302                        }
 303                        MentionUri::Rule { .. } => {
 304                            write!(
 305                                &mut rules_context,
 306                                "\n{}",
 307                                MarkdownCodeBlock {
 308                                    tag: "",
 309                                    text: content
 310                                }
 311                            )
 312                            .ok();
 313                        }
 314                        MentionUri::Fetch { url } => {
 315                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 316                        }
 317                        MentionUri::Diagnostics { .. } => {
 318                            write!(&mut diagnostics_context, "\n{}\n", content).ok();
 319                        }
 320                        MentionUri::TerminalSelection { .. } => {
 321                            write!(
 322                                &mut selection_context,
 323                                "\n{}",
 324                                MarkdownCodeBlock {
 325                                    tag: "console",
 326                                    text: content
 327                                }
 328                            )
 329                            .ok();
 330                        }
 331                    }
 332
 333                    language_model::MessageContent::Text(uri.as_link().to_string())
 334                }
 335            };
 336
 337            message.content.push(chunk);
 338        }
 339
 340        let len_before_context = message.content.len();
 341
 342        if file_context.len() > OPEN_FILES_TAG.len() {
 343            file_context.push_str("</files>\n");
 344            message
 345                .content
 346                .push(language_model::MessageContent::Text(file_context));
 347        }
 348
 349        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 350            directory_context.push_str("</directories>\n");
 351            message
 352                .content
 353                .push(language_model::MessageContent::Text(directory_context));
 354        }
 355
 356        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 357            symbol_context.push_str("</symbols>\n");
 358            message
 359                .content
 360                .push(language_model::MessageContent::Text(symbol_context));
 361        }
 362
 363        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 364            selection_context.push_str("</selections>\n");
 365            message
 366                .content
 367                .push(language_model::MessageContent::Text(selection_context));
 368        }
 369
 370        if thread_context.len() > OPEN_THREADS_TAG.len() {
 371            thread_context.push_str("</threads>\n");
 372            message
 373                .content
 374                .push(language_model::MessageContent::Text(thread_context));
 375        }
 376
 377        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 378            fetch_context.push_str("</fetched_urls>\n");
 379            message
 380                .content
 381                .push(language_model::MessageContent::Text(fetch_context));
 382        }
 383
 384        if rules_context.len() > OPEN_RULES_TAG.len() {
 385            rules_context.push_str("</user_rules>\n");
 386            message
 387                .content
 388                .push(language_model::MessageContent::Text(rules_context));
 389        }
 390
 391        if diagnostics_context.len() > OPEN_DIAGNOSTICS_TAG.len() {
 392            diagnostics_context.push_str("</diagnostics>\n");
 393            message
 394                .content
 395                .push(language_model::MessageContent::Text(diagnostics_context));
 396        }
 397
 398        if message.content.len() > len_before_context {
 399            message.content.insert(
 400                len_before_context,
 401                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 402            );
 403            message
 404                .content
 405                .push(language_model::MessageContent::Text("</context>".into()));
 406        }
 407
 408        message
 409    }
 410}
 411
 412fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 413    let mut result = String::new();
 414
 415    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 416        let _ = write!(result, "{} ", extension);
 417    }
 418
 419    let _ = write!(result, "{}", full_path.display());
 420
 421    if let Some(range) = line_range {
 422        if range.start() == range.end() {
 423            let _ = write!(result, ":{}", range.start() + 1);
 424        } else {
 425            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 426        }
 427    }
 428
 429    result
 430}
 431
 432impl AgentMessage {
 433    pub fn to_markdown(&self) -> String {
 434        let mut markdown = String::from("## Assistant\n\n");
 435
 436        for content in &self.content {
 437            match content {
 438                AgentMessageContent::Text(text) => {
 439                    markdown.push_str(text);
 440                    markdown.push('\n');
 441                }
 442                AgentMessageContent::Thinking { text, .. } => {
 443                    markdown.push_str("<think>");
 444                    markdown.push_str(text);
 445                    markdown.push_str("</think>\n");
 446                }
 447                AgentMessageContent::RedactedThinking(_) => {
 448                    markdown.push_str("<redacted_thinking />\n")
 449                }
 450                AgentMessageContent::ToolUse(tool_use) => {
 451                    markdown.push_str(&format!(
 452                        "**Tool Use**: {} (ID: {})\n",
 453                        tool_use.name, tool_use.id
 454                    ));
 455                    markdown.push_str(&format!(
 456                        "{}\n",
 457                        MarkdownCodeBlock {
 458                            tag: "json",
 459                            text: &format!("{:#}", tool_use.input)
 460                        }
 461                    ));
 462                }
 463            }
 464        }
 465
 466        for tool_result in self.tool_results.values() {
 467            markdown.push_str(&format!(
 468                "**Tool Result**: {} (ID: {})\n\n",
 469                tool_result.tool_name, tool_result.tool_use_id
 470            ));
 471            if tool_result.is_error {
 472                markdown.push_str("**ERROR:**\n");
 473            }
 474
 475            match &tool_result.content {
 476                LanguageModelToolResultContent::Text(text) => {
 477                    writeln!(markdown, "{text}\n").ok();
 478                }
 479                LanguageModelToolResultContent::Image(_) => {
 480                    writeln!(markdown, "<image />\n").ok();
 481                }
 482            }
 483
 484            if let Some(output) = tool_result.output.as_ref() {
 485                writeln!(
 486                    markdown,
 487                    "**Debug Output**:\n\n```json\n{}\n```\n",
 488                    serde_json::to_string_pretty(output).unwrap()
 489                )
 490                .unwrap();
 491            }
 492        }
 493
 494        markdown
 495    }
 496
 497    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 498        let mut assistant_message = LanguageModelRequestMessage {
 499            role: Role::Assistant,
 500            content: Vec::with_capacity(self.content.len()),
 501            cache: false,
 502            reasoning_details: self.reasoning_details.clone(),
 503        };
 504        for chunk in &self.content {
 505            match chunk {
 506                AgentMessageContent::Text(text) => {
 507                    assistant_message
 508                        .content
 509                        .push(language_model::MessageContent::Text(text.clone()));
 510                }
 511                AgentMessageContent::Thinking { text, signature } => {
 512                    assistant_message
 513                        .content
 514                        .push(language_model::MessageContent::Thinking {
 515                            text: text.clone(),
 516                            signature: signature.clone(),
 517                        });
 518                }
 519                AgentMessageContent::RedactedThinking(value) => {
 520                    assistant_message.content.push(
 521                        language_model::MessageContent::RedactedThinking(value.clone()),
 522                    );
 523                }
 524                AgentMessageContent::ToolUse(tool_use) => {
 525                    if self.tool_results.contains_key(&tool_use.id) {
 526                        assistant_message
 527                            .content
 528                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 529                    }
 530                }
 531            };
 532        }
 533
 534        let mut user_message = LanguageModelRequestMessage {
 535            role: Role::User,
 536            content: Vec::new(),
 537            cache: false,
 538            reasoning_details: None,
 539        };
 540
 541        for tool_result in self.tool_results.values() {
 542            let mut tool_result = tool_result.clone();
 543            // Surprisingly, the API fails if we return an empty string here.
 544            // It thinks we are sending a tool use without a tool result.
 545            if tool_result.content.is_empty() {
 546                tool_result.content = "<Tool returned an empty string>".into();
 547            }
 548            user_message
 549                .content
 550                .push(language_model::MessageContent::ToolResult(tool_result));
 551        }
 552
 553        let mut messages = Vec::new();
 554        if !assistant_message.content.is_empty() {
 555            messages.push(assistant_message);
 556        }
 557        if !user_message.content.is_empty() {
 558            messages.push(user_message);
 559        }
 560        messages
 561    }
 562}
 563
 564#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 565pub struct AgentMessage {
 566    pub content: Vec<AgentMessageContent>,
 567    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 568    pub reasoning_details: Option<serde_json::Value>,
 569}
 570
 571#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 572pub enum AgentMessageContent {
 573    Text(String),
 574    Thinking {
 575        text: String,
 576        signature: Option<String>,
 577    },
 578    RedactedThinking(String),
 579    ToolUse(LanguageModelToolUse),
 580}
 581
 582pub trait TerminalHandle {
 583    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 584    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 585    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 586    fn kill(&self, cx: &AsyncApp) -> Result<()>;
 587    fn was_stopped_by_user(&self, cx: &AsyncApp) -> Result<bool>;
 588}
 589
 590pub trait ThreadEnvironment {
 591    fn create_terminal(
 592        &self,
 593        command: String,
 594        cwd: Option<PathBuf>,
 595        output_byte_limit: Option<u64>,
 596        cx: &mut AsyncApp,
 597    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 598}
 599
 600#[derive(Debug)]
 601pub enum ThreadEvent {
 602    UserMessage(UserMessage),
 603    AgentText(String),
 604    AgentThinking(String),
 605    ToolCall(acp::ToolCall),
 606    ToolCallUpdate(acp_thread::ToolCallUpdate),
 607    ToolCallAuthorization(ToolCallAuthorization),
 608    Retry(acp_thread::RetryStatus),
 609    Stop(acp::StopReason),
 610}
 611
 612#[derive(Debug)]
 613pub struct NewTerminal {
 614    pub command: String,
 615    pub output_byte_limit: Option<u64>,
 616    pub cwd: Option<PathBuf>,
 617    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 618}
 619
 620#[derive(Debug, Clone)]
 621pub struct ToolPermissionContext {
 622    pub tool_name: String,
 623    pub input_value: String,
 624}
 625
 626impl ToolPermissionContext {
 627    pub fn new(tool_name: impl Into<String>, input_value: impl Into<String>) -> Self {
 628        Self {
 629            tool_name: tool_name.into(),
 630            input_value: input_value.into(),
 631        }
 632    }
 633
 634    /// Builds the permission options for this tool context.
 635    ///
 636    /// This is the canonical source for permission option generation.
 637    /// Tests should use this function rather than manually constructing options.
 638    ///
 639    /// # Shell Compatibility for Terminal Tool
 640    ///
 641    /// For the terminal tool, "Always allow" options are only shown when the user's
 642    /// shell supports POSIX-like command chaining syntax (`&&`, `||`, `;`, `|`).
 643    ///
 644    /// **Why this matters:** When a user sets up an "always allow" pattern like `^cargo`,
 645    /// we need to parse the command to extract all sub-commands and verify that EVERY
 646    /// sub-command matches the pattern. Otherwise, an attacker could craft a command like
 647    /// `cargo build && rm -rf /` that would bypass the security check.
 648    ///
 649    /// **Supported shells:** Posix (sh, bash, dash, zsh), Fish 3.0+, PowerShell 7+/Pwsh,
 650    /// Cmd, Xonsh, Csh, Tcsh
 651    ///
 652    /// **Unsupported shells:** Nushell (uses `and`/`or` keywords), Elvish (uses `and`/`or`
 653    /// keywords), Rc (Plan 9 shell - no `&&`/`||` operators)
 654    ///
 655    /// For unsupported shells, we hide the "Always allow" UI options entirely, and if
 656    /// the user has `always_allow` rules configured in settings, `ToolPermissionDecision::from_input`
 657    /// will return a `Deny` with an explanatory error message.
 658    pub fn build_permission_options(&self) -> acp_thread::PermissionOptions {
 659        use crate::pattern_extraction::*;
 660        use util::shell::ShellKind;
 661
 662        let tool_name = &self.tool_name;
 663        let input_value = &self.input_value;
 664
 665        // Check if the user's shell supports POSIX-like command chaining.
 666        // See the doc comment above for the full explanation of why this is needed.
 667        let shell_supports_always_allow = if tool_name == TerminalTool::name() {
 668            ShellKind::system()
 669                .map(|k| k.supports_posix_chaining())
 670                .unwrap_or(false)
 671        } else {
 672            true
 673        };
 674
 675        let (pattern, pattern_display) = if tool_name == TerminalTool::name() {
 676            (
 677                extract_terminal_pattern(input_value),
 678                extract_terminal_pattern_display(input_value),
 679            )
 680        } else if tool_name == EditFileTool::name()
 681            || tool_name == DeletePathTool::name()
 682            || tool_name == MovePathTool::name()
 683            || tool_name == CreateDirectoryTool::name()
 684            || tool_name == SaveFileTool::name()
 685        {
 686            (
 687                extract_path_pattern(input_value),
 688                extract_path_pattern_display(input_value),
 689            )
 690        } else if tool_name == FetchTool::name() {
 691            (
 692                extract_url_pattern(input_value),
 693                extract_url_pattern_display(input_value),
 694            )
 695        } else {
 696            (None, None)
 697        };
 698
 699        let mut choices = Vec::new();
 700
 701        let mut push_choice = |label: String, allow_id, deny_id, allow_kind, deny_kind| {
 702            choices.push(acp_thread::PermissionOptionChoice {
 703                allow: acp::PermissionOption::new(
 704                    acp::PermissionOptionId::new(allow_id),
 705                    label.clone(),
 706                    allow_kind,
 707                ),
 708                deny: acp::PermissionOption::new(
 709                    acp::PermissionOptionId::new(deny_id),
 710                    label,
 711                    deny_kind,
 712                ),
 713            });
 714        };
 715
 716        if shell_supports_always_allow {
 717            push_choice(
 718                format!("Always for {}", tool_name.replace('_', " ")),
 719                format!("always_allow:{}", tool_name),
 720                format!("always_deny:{}", tool_name),
 721                acp::PermissionOptionKind::AllowAlways,
 722                acp::PermissionOptionKind::RejectAlways,
 723            );
 724
 725            if let (Some(pattern), Some(display)) = (pattern, pattern_display) {
 726                let button_text = if tool_name == TerminalTool::name() {
 727                    format!("Always for `{}` commands", display)
 728                } else {
 729                    format!("Always for `{}`", display)
 730                };
 731                push_choice(
 732                    button_text,
 733                    format!("always_allow_pattern:{}:{}", tool_name, pattern),
 734                    format!("always_deny_pattern:{}:{}", tool_name, pattern),
 735                    acp::PermissionOptionKind::AllowAlways,
 736                    acp::PermissionOptionKind::RejectAlways,
 737                );
 738            }
 739        }
 740
 741        push_choice(
 742            "Only this time".to_string(),
 743            "allow".to_string(),
 744            "deny".to_string(),
 745            acp::PermissionOptionKind::AllowOnce,
 746            acp::PermissionOptionKind::RejectOnce,
 747        );
 748
 749        acp_thread::PermissionOptions::Dropdown(choices)
 750    }
 751}
 752
 753#[derive(Debug)]
 754pub struct ToolCallAuthorization {
 755    pub tool_call: acp::ToolCallUpdate,
 756    pub options: acp_thread::PermissionOptions,
 757    pub response: oneshot::Sender<acp::PermissionOptionId>,
 758    pub context: Option<ToolPermissionContext>,
 759}
 760
 761#[derive(Debug, thiserror::Error)]
 762enum CompletionError {
 763    #[error("max tokens")]
 764    MaxTokens,
 765    #[error("refusal")]
 766    Refusal,
 767    #[error(transparent)]
 768    Other(#[from] anyhow::Error),
 769}
 770
 771pub struct Thread {
 772    id: acp::SessionId,
 773    prompt_id: PromptId,
 774    updated_at: DateTime<Utc>,
 775    title: Option<SharedString>,
 776    pending_title_generation: Option<Task<()>>,
 777    pending_summary_generation: Option<Shared<Task<Option<SharedString>>>>,
 778    summary: Option<SharedString>,
 779    messages: Vec<Message>,
 780    user_store: Entity<UserStore>,
 781    /// Holds the task that handles agent interaction until the end of the turn.
 782    /// Survives across multiple requests as the model performs tool calls and
 783    /// we run tools, report their results.
 784    running_turn: Option<RunningTurn>,
 785    /// Flag indicating the UI has a queued message waiting to be sent.
 786    /// Used to signal that the turn should end at the next message boundary.
 787    has_queued_message: bool,
 788    pending_message: Option<AgentMessage>,
 789    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 790    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 791    #[allow(unused)]
 792    cumulative_token_usage: TokenUsage,
 793    #[allow(unused)]
 794    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 795    context_server_registry: Entity<ContextServerRegistry>,
 796    profile_id: AgentProfileId,
 797    project_context: Entity<ProjectContext>,
 798    templates: Arc<Templates>,
 799    model: Option<Arc<dyn LanguageModel>>,
 800    summarization_model: Option<Arc<dyn LanguageModel>>,
 801    thinking_enabled: bool,
 802    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 803    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 804    pub(crate) project: Entity<Project>,
 805    pub(crate) action_log: Entity<ActionLog>,
 806    /// Tracks the last time files were read by the agent, to detect external modifications
 807    pub(crate) file_read_times: HashMap<PathBuf, fs::MTime>,
 808    /// True if this thread was imported from a shared thread and can be synced.
 809    imported: bool,
 810    /// If this is a subagent thread, contains context about the parent
 811    subagent_context: Option<SubagentContext>,
 812    /// Weak references to running subagent threads for cancellation propagation
 813    running_subagents: Vec<WeakEntity<Thread>>,
 814}
 815
 816impl Thread {
 817    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 818        let image = model.map_or(true, |model| model.supports_images());
 819        acp::PromptCapabilities::new()
 820            .image(image)
 821            .embedded_context(true)
 822    }
 823
 824    pub fn new(
 825        project: Entity<Project>,
 826        project_context: Entity<ProjectContext>,
 827        context_server_registry: Entity<ContextServerRegistry>,
 828        templates: Arc<Templates>,
 829        model: Option<Arc<dyn LanguageModel>>,
 830        cx: &mut Context<Self>,
 831    ) -> Self {
 832        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 833        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 834        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 835            watch::channel(Self::prompt_capabilities(model.as_deref()));
 836        Self {
 837            id: acp::SessionId::new(uuid::Uuid::new_v4().to_string()),
 838            prompt_id: PromptId::new(),
 839            updated_at: Utc::now(),
 840            title: None,
 841            pending_title_generation: None,
 842            pending_summary_generation: None,
 843            summary: None,
 844            messages: Vec::new(),
 845            user_store: project.read(cx).user_store(),
 846            running_turn: None,
 847            has_queued_message: false,
 848            pending_message: None,
 849            tools: BTreeMap::default(),
 850            request_token_usage: HashMap::default(),
 851            cumulative_token_usage: TokenUsage::default(),
 852            initial_project_snapshot: {
 853                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 854                cx.foreground_executor()
 855                    .spawn(async move { Some(project_snapshot.await) })
 856                    .shared()
 857            },
 858            context_server_registry,
 859            profile_id,
 860            project_context,
 861            templates,
 862            model,
 863            summarization_model: None,
 864            thinking_enabled: true,
 865            prompt_capabilities_tx,
 866            prompt_capabilities_rx,
 867            project,
 868            action_log,
 869            file_read_times: HashMap::default(),
 870            imported: false,
 871            subagent_context: None,
 872            running_subagents: Vec::new(),
 873        }
 874    }
 875
 876    pub fn new_subagent(
 877        project: Entity<Project>,
 878        project_context: Entity<ProjectContext>,
 879        context_server_registry: Entity<ContextServerRegistry>,
 880        templates: Arc<Templates>,
 881        model: Arc<dyn LanguageModel>,
 882        subagent_context: SubagentContext,
 883        parent_tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 884        cx: &mut Context<Self>,
 885    ) -> Self {
 886        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 887        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 888        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 889            watch::channel(Self::prompt_capabilities(Some(model.as_ref())));
 890
 891        // Rebind tools that hold thread references to use this subagent's thread
 892        // instead of the parent's thread. This is critical for tools like EditFileTool
 893        // that make model requests using the thread's ID.
 894        let weak_self = cx.weak_entity();
 895        let tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>> = parent_tools
 896            .into_iter()
 897            .map(|(name, tool)| {
 898                let rebound = tool.rebind_thread(weak_self.clone()).unwrap_or(tool);
 899                (name, rebound)
 900            })
 901            .collect();
 902
 903        Self {
 904            id: acp::SessionId::new(uuid::Uuid::new_v4().to_string()),
 905            prompt_id: PromptId::new(),
 906            updated_at: Utc::now(),
 907            title: None,
 908            pending_title_generation: None,
 909            pending_summary_generation: None,
 910            summary: None,
 911            messages: Vec::new(),
 912            user_store: project.read(cx).user_store(),
 913            running_turn: None,
 914            has_queued_message: false,
 915            pending_message: None,
 916            tools,
 917            request_token_usage: HashMap::default(),
 918            cumulative_token_usage: TokenUsage::default(),
 919            initial_project_snapshot: Task::ready(None).shared(),
 920            context_server_registry,
 921            profile_id,
 922            project_context,
 923            templates,
 924            model: Some(model),
 925            summarization_model: None,
 926            thinking_enabled: true,
 927            prompt_capabilities_tx,
 928            prompt_capabilities_rx,
 929            project,
 930            action_log,
 931            file_read_times: HashMap::default(),
 932            imported: false,
 933            subagent_context: Some(subagent_context),
 934            running_subagents: Vec::new(),
 935        }
 936    }
 937
 938    pub fn id(&self) -> &acp::SessionId {
 939        &self.id
 940    }
 941
 942    /// Returns true if this thread was imported from a shared thread.
 943    pub fn is_imported(&self) -> bool {
 944        self.imported
 945    }
 946
 947    pub fn replay(
 948        &mut self,
 949        cx: &mut Context<Self>,
 950    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 951        let (tx, rx) = mpsc::unbounded();
 952        let stream = ThreadEventStream(tx);
 953        for message in &self.messages {
 954            match message {
 955                Message::User(user_message) => stream.send_user_message(user_message),
 956                Message::Agent(assistant_message) => {
 957                    for content in &assistant_message.content {
 958                        match content {
 959                            AgentMessageContent::Text(text) => stream.send_text(text),
 960                            AgentMessageContent::Thinking { text, .. } => {
 961                                stream.send_thinking(text)
 962                            }
 963                            AgentMessageContent::RedactedThinking(_) => {}
 964                            AgentMessageContent::ToolUse(tool_use) => {
 965                                self.replay_tool_call(
 966                                    tool_use,
 967                                    assistant_message.tool_results.get(&tool_use.id),
 968                                    &stream,
 969                                    cx,
 970                                );
 971                            }
 972                        }
 973                    }
 974                }
 975                Message::Resume => {}
 976            }
 977        }
 978        rx
 979    }
 980
 981    fn replay_tool_call(
 982        &self,
 983        tool_use: &LanguageModelToolUse,
 984        tool_result: Option<&LanguageModelToolResult>,
 985        stream: &ThreadEventStream,
 986        cx: &mut Context<Self>,
 987    ) {
 988        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
 989            self.context_server_registry
 990                .read(cx)
 991                .servers()
 992                .find_map(|(_, tools)| {
 993                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
 994                        Some(tool.clone())
 995                    } else {
 996                        None
 997                    }
 998                })
 999        });
1000
1001        let Some(tool) = tool else {
1002            stream
1003                .0
1004                .unbounded_send(Ok(ThreadEvent::ToolCall(
1005                    acp::ToolCall::new(tool_use.id.to_string(), tool_use.name.to_string())
1006                        .status(acp::ToolCallStatus::Failed)
1007                        .raw_input(tool_use.input.clone()),
1008                )))
1009                .ok();
1010            return;
1011        };
1012
1013        let title = tool.initial_title(tool_use.input.clone(), cx);
1014        let kind = tool.kind();
1015        stream.send_tool_call(
1016            &tool_use.id,
1017            &tool_use.name,
1018            title,
1019            kind,
1020            tool_use.input.clone(),
1021        );
1022
1023        let output = tool_result
1024            .as_ref()
1025            .and_then(|result| result.output.clone());
1026        if let Some(output) = output.clone() {
1027            // For replay, we use a dummy cancellation receiver since the tool already completed
1028            let (_cancellation_tx, cancellation_rx) = watch::channel(false);
1029            let tool_event_stream = ToolCallEventStream::new(
1030                tool_use.id.clone(),
1031                stream.clone(),
1032                Some(self.project.read(cx).fs().clone()),
1033                cancellation_rx,
1034            );
1035            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
1036                .log_err();
1037        }
1038
1039        stream.update_tool_call_fields(
1040            &tool_use.id,
1041            acp::ToolCallUpdateFields::new()
1042                .status(
1043                    tool_result
1044                        .as_ref()
1045                        .map_or(acp::ToolCallStatus::Failed, |result| {
1046                            if result.is_error {
1047                                acp::ToolCallStatus::Failed
1048                            } else {
1049                                acp::ToolCallStatus::Completed
1050                            }
1051                        }),
1052                )
1053                .raw_output(output),
1054        );
1055    }
1056
1057    pub fn from_db(
1058        id: acp::SessionId,
1059        db_thread: DbThread,
1060        project: Entity<Project>,
1061        project_context: Entity<ProjectContext>,
1062        context_server_registry: Entity<ContextServerRegistry>,
1063        templates: Arc<Templates>,
1064        cx: &mut Context<Self>,
1065    ) -> Self {
1066        let profile_id = db_thread
1067            .profile
1068            .unwrap_or_else(|| AgentSettings::get_global(cx).default_profile.clone());
1069
1070        let mut model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
1071            db_thread
1072                .model
1073                .and_then(|model| {
1074                    let model = SelectedModel {
1075                        provider: model.provider.clone().into(),
1076                        model: model.model.into(),
1077                    };
1078                    registry.select_model(&model, cx)
1079                })
1080                .or_else(|| registry.default_model())
1081                .map(|model| model.model)
1082        });
1083
1084        if model.is_none() {
1085            model = Self::resolve_profile_model(&profile_id, cx);
1086        }
1087        if model.is_none() {
1088            model = LanguageModelRegistry::global(cx).update(cx, |registry, _cx| {
1089                registry.default_model().map(|model| model.model)
1090            });
1091        }
1092
1093        let (prompt_capabilities_tx, prompt_capabilities_rx) =
1094            watch::channel(Self::prompt_capabilities(model.as_deref()));
1095
1096        let action_log = cx.new(|_| ActionLog::new(project.clone()));
1097
1098        Self {
1099            id,
1100            prompt_id: PromptId::new(),
1101            title: if db_thread.title.is_empty() {
1102                None
1103            } else {
1104                Some(db_thread.title.clone())
1105            },
1106            pending_title_generation: None,
1107            pending_summary_generation: None,
1108            summary: db_thread.detailed_summary,
1109            messages: db_thread.messages,
1110            user_store: project.read(cx).user_store(),
1111            running_turn: None,
1112            has_queued_message: false,
1113            pending_message: None,
1114            tools: BTreeMap::default(),
1115            request_token_usage: db_thread.request_token_usage.clone(),
1116            cumulative_token_usage: db_thread.cumulative_token_usage,
1117            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
1118            context_server_registry,
1119            profile_id,
1120            project_context,
1121            templates,
1122            model,
1123            summarization_model: None,
1124            // TODO: Persist this on the `DbThread`.
1125            thinking_enabled: true,
1126            project,
1127            action_log,
1128            updated_at: db_thread.updated_at,
1129            prompt_capabilities_tx,
1130            prompt_capabilities_rx,
1131            file_read_times: HashMap::default(),
1132            imported: db_thread.imported,
1133            subagent_context: None,
1134            running_subagents: Vec::new(),
1135        }
1136    }
1137
1138    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
1139        let initial_project_snapshot = self.initial_project_snapshot.clone();
1140        let mut thread = DbThread {
1141            title: self.title(),
1142            messages: self.messages.clone(),
1143            updated_at: self.updated_at,
1144            detailed_summary: self.summary.clone(),
1145            initial_project_snapshot: None,
1146            cumulative_token_usage: self.cumulative_token_usage,
1147            request_token_usage: self.request_token_usage.clone(),
1148            model: self.model.as_ref().map(|model| DbLanguageModel {
1149                provider: model.provider_id().to_string(),
1150                model: model.name().0.to_string(),
1151            }),
1152            profile: Some(self.profile_id.clone()),
1153            imported: self.imported,
1154        };
1155
1156        cx.background_spawn(async move {
1157            let initial_project_snapshot = initial_project_snapshot.await;
1158            thread.initial_project_snapshot = initial_project_snapshot;
1159            thread
1160        })
1161    }
1162
1163    /// Create a snapshot of the current project state including git information and unsaved buffers.
1164    fn project_snapshot(
1165        project: Entity<Project>,
1166        cx: &mut Context<Self>,
1167    ) -> Task<Arc<ProjectSnapshot>> {
1168        let task = project::telemetry_snapshot::TelemetrySnapshot::new(&project, cx);
1169        cx.spawn(async move |_, _| {
1170            let snapshot = task.await;
1171
1172            Arc::new(ProjectSnapshot {
1173                worktree_snapshots: snapshot.worktree_snapshots,
1174                timestamp: Utc::now(),
1175            })
1176        })
1177    }
1178
1179    pub fn project_context(&self) -> &Entity<ProjectContext> {
1180        &self.project_context
1181    }
1182
1183    pub fn project(&self) -> &Entity<Project> {
1184        &self.project
1185    }
1186
1187    pub fn action_log(&self) -> &Entity<ActionLog> {
1188        &self.action_log
1189    }
1190
1191    pub fn is_empty(&self) -> bool {
1192        self.messages.is_empty() && self.title.is_none()
1193    }
1194
1195    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
1196        self.model.as_ref()
1197    }
1198
1199    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
1200        let old_usage = self.latest_token_usage();
1201        self.model = Some(model);
1202        let new_caps = Self::prompt_capabilities(self.model.as_deref());
1203        let new_usage = self.latest_token_usage();
1204        if old_usage != new_usage {
1205            cx.emit(TokenUsageUpdated(new_usage));
1206        }
1207        self.prompt_capabilities_tx.send(new_caps).log_err();
1208        cx.notify()
1209    }
1210
1211    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
1212        self.summarization_model.as_ref()
1213    }
1214
1215    pub fn set_summarization_model(
1216        &mut self,
1217        model: Option<Arc<dyn LanguageModel>>,
1218        cx: &mut Context<Self>,
1219    ) {
1220        self.summarization_model = model;
1221        cx.notify()
1222    }
1223
1224    pub fn thinking_enabled(&self) -> bool {
1225        self.thinking_enabled
1226    }
1227
1228    pub fn set_thinking_enabled(&mut self, enabled: bool, cx: &mut Context<Self>) {
1229        self.thinking_enabled = enabled;
1230        cx.notify();
1231    }
1232
1233    pub fn last_message(&self) -> Option<Message> {
1234        if let Some(message) = self.pending_message.clone() {
1235            Some(Message::Agent(message))
1236        } else {
1237            self.messages.last().cloned()
1238        }
1239    }
1240
1241    pub fn add_default_tools(
1242        &mut self,
1243        environment: Rc<dyn ThreadEnvironment>,
1244        cx: &mut Context<Self>,
1245    ) {
1246        let language_registry = self.project.read(cx).languages().clone();
1247        self.add_tool(CopyPathTool::new(self.project.clone()));
1248        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
1249        self.add_tool(DeletePathTool::new(
1250            self.project.clone(),
1251            self.action_log.clone(),
1252        ));
1253        self.add_tool(DiagnosticsTool::new(self.project.clone()));
1254        self.add_tool(EditFileTool::new(
1255            self.project.clone(),
1256            cx.weak_entity(),
1257            language_registry.clone(),
1258            Templates::new(),
1259        ));
1260        self.add_tool(StreamingEditFileTool::new(
1261            self.project.clone(),
1262            cx.weak_entity(),
1263            language_registry,
1264            Templates::new(),
1265        ));
1266        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
1267        self.add_tool(FindPathTool::new(self.project.clone()));
1268        self.add_tool(GrepTool::new(self.project.clone()));
1269        self.add_tool(ListDirectoryTool::new(self.project.clone()));
1270        self.add_tool(MovePathTool::new(self.project.clone()));
1271        self.add_tool(NowTool);
1272        self.add_tool(OpenTool::new(self.project.clone()));
1273        self.add_tool(ReadFileTool::new(
1274            cx.weak_entity(),
1275            self.project.clone(),
1276            self.action_log.clone(),
1277        ));
1278        self.add_tool(SaveFileTool::new(self.project.clone()));
1279        self.add_tool(RestoreFileFromDiskTool::new(self.project.clone()));
1280        self.add_tool(TerminalTool::new(self.project.clone(), environment));
1281        self.add_tool(ThinkingTool);
1282        self.add_tool(WebSearchTool);
1283
1284        if cx.has_flag::<SubagentsFeatureFlag>() && self.depth() < MAX_SUBAGENT_DEPTH {
1285            let parent_tools = self.tools.clone();
1286            self.add_tool(SubagentTool::new(
1287                cx.weak_entity(),
1288                self.project.clone(),
1289                self.project_context.clone(),
1290                self.context_server_registry.clone(),
1291                self.templates.clone(),
1292                self.depth(),
1293                parent_tools,
1294            ));
1295        }
1296    }
1297
1298    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
1299        self.tools.insert(T::name().into(), tool.erase());
1300    }
1301
1302    pub fn remove_tool(&mut self, name: &str) -> bool {
1303        self.tools.remove(name).is_some()
1304    }
1305
1306    pub fn restrict_tools(&mut self, allowed: &collections::HashSet<SharedString>) {
1307        self.tools.retain(|name, _| allowed.contains(name));
1308    }
1309
1310    pub fn profile(&self) -> &AgentProfileId {
1311        &self.profile_id
1312    }
1313
1314    pub fn set_profile(&mut self, profile_id: AgentProfileId, cx: &mut Context<Self>) {
1315        if self.profile_id == profile_id {
1316            return;
1317        }
1318
1319        self.profile_id = profile_id;
1320
1321        // Swap to the profile's preferred model when available.
1322        if let Some(model) = Self::resolve_profile_model(&self.profile_id, cx) {
1323            self.set_model(model, cx);
1324        }
1325    }
1326
1327    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1328        for subagent in self.running_subagents.drain(..) {
1329            if let Some(subagent) = subagent.upgrade() {
1330                subagent.update(cx, |thread, cx| thread.cancel(cx)).detach();
1331            }
1332        }
1333
1334        let Some(running_turn) = self.running_turn.take() else {
1335            self.flush_pending_message(cx);
1336            return Task::ready(());
1337        };
1338
1339        let turn_task = running_turn.cancel();
1340
1341        cx.spawn(async move |this, cx| {
1342            turn_task.await;
1343            this.update(cx, |this, cx| {
1344                this.flush_pending_message(cx);
1345            })
1346            .ok();
1347        })
1348    }
1349
1350    pub fn set_has_queued_message(&mut self, has_queued: bool) {
1351        self.has_queued_message = has_queued;
1352    }
1353
1354    pub fn has_queued_message(&self) -> bool {
1355        self.has_queued_message
1356    }
1357
1358    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1359        let Some(last_user_message) = self.last_user_message() else {
1360            return;
1361        };
1362
1363        self.request_token_usage
1364            .insert(last_user_message.id.clone(), update);
1365        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1366        cx.notify();
1367    }
1368
1369    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1370        self.cancel(cx).detach();
1371        // Clear pending message since cancel will try to flush it asynchronously,
1372        // and we don't want that content to be added after we truncate
1373        self.pending_message.take();
1374        let Some(position) = self.messages.iter().position(
1375            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1376        ) else {
1377            return Err(anyhow!("Message not found"));
1378        };
1379
1380        for message in self.messages.drain(position..) {
1381            match message {
1382                Message::User(message) => {
1383                    self.request_token_usage.remove(&message.id);
1384                }
1385                Message::Agent(_) | Message::Resume => {}
1386            }
1387        }
1388        self.clear_summary();
1389        cx.notify();
1390        Ok(())
1391    }
1392
1393    pub fn latest_request_token_usage(&self) -> Option<language_model::TokenUsage> {
1394        let last_user_message = self.last_user_message()?;
1395        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1396        Some(*tokens)
1397    }
1398
1399    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1400        let usage = self.latest_request_token_usage()?;
1401        let model = self.model.clone()?;
1402        Some(acp_thread::TokenUsage {
1403            max_tokens: model.max_token_count(),
1404            used_tokens: usage.total_tokens(),
1405            input_tokens: usage.input_tokens,
1406            output_tokens: usage.output_tokens,
1407        })
1408    }
1409
1410    /// Get the total input token count as of the message before the given message.
1411    ///
1412    /// Returns `None` if:
1413    /// - `target_id` is the first message (no previous message)
1414    /// - The previous message hasn't received a response yet (no usage data)
1415    /// - `target_id` is not found in the messages
1416    pub fn tokens_before_message(&self, target_id: &UserMessageId) -> Option<u64> {
1417        let mut previous_user_message_id: Option<&UserMessageId> = None;
1418
1419        for message in &self.messages {
1420            if let Message::User(user_msg) = message {
1421                if &user_msg.id == target_id {
1422                    let prev_id = previous_user_message_id?;
1423                    let usage = self.request_token_usage.get(prev_id)?;
1424                    return Some(usage.input_tokens);
1425                }
1426                previous_user_message_id = Some(&user_msg.id);
1427            }
1428        }
1429        None
1430    }
1431
1432    /// Look up the active profile and resolve its preferred model if one is configured.
1433    fn resolve_profile_model(
1434        profile_id: &AgentProfileId,
1435        cx: &mut Context<Self>,
1436    ) -> Option<Arc<dyn LanguageModel>> {
1437        let selection = AgentSettings::get_global(cx)
1438            .profiles
1439            .get(profile_id)?
1440            .default_model
1441            .clone()?;
1442        Self::resolve_model_from_selection(&selection, cx)
1443    }
1444
1445    /// Translate a stored model selection into the configured model from the registry.
1446    fn resolve_model_from_selection(
1447        selection: &LanguageModelSelection,
1448        cx: &mut Context<Self>,
1449    ) -> Option<Arc<dyn LanguageModel>> {
1450        let selected = SelectedModel {
1451            provider: LanguageModelProviderId::from(selection.provider.0.clone()),
1452            model: LanguageModelId::from(selection.model.clone()),
1453        };
1454        LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
1455            registry
1456                .select_model(&selected, cx)
1457                .map(|configured| configured.model)
1458        })
1459    }
1460
1461    pub fn resume(
1462        &mut self,
1463        cx: &mut Context<Self>,
1464    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1465        self.messages.push(Message::Resume);
1466        cx.notify();
1467
1468        log::debug!("Total messages in thread: {}", self.messages.len());
1469        self.run_turn(cx)
1470    }
1471
1472    /// Sending a message results in the model streaming a response, which could include tool calls.
1473    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1474    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1475    pub fn send<T>(
1476        &mut self,
1477        id: UserMessageId,
1478        content: impl IntoIterator<Item = T>,
1479        cx: &mut Context<Self>,
1480    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1481    where
1482        T: Into<UserMessageContent>,
1483    {
1484        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1485        log::debug!("Thread::send content: {:?}", content);
1486
1487        self.messages
1488            .push(Message::User(UserMessage { id, content }));
1489        cx.notify();
1490
1491        self.send_existing(cx)
1492    }
1493
1494    pub fn send_existing(
1495        &mut self,
1496        cx: &mut Context<Self>,
1497    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1498        let model = self.model().context("No language model configured")?;
1499
1500        log::info!("Thread::send called with model: {}", model.name().0);
1501        self.advance_prompt_id();
1502
1503        log::debug!("Total messages in thread: {}", self.messages.len());
1504        self.run_turn(cx)
1505    }
1506
1507    pub fn push_acp_user_block(
1508        &mut self,
1509        id: UserMessageId,
1510        blocks: impl IntoIterator<Item = acp::ContentBlock>,
1511        path_style: PathStyle,
1512        cx: &mut Context<Self>,
1513    ) {
1514        let content = blocks
1515            .into_iter()
1516            .map(|block| UserMessageContent::from_content_block(block, path_style))
1517            .collect::<Vec<_>>();
1518        self.messages
1519            .push(Message::User(UserMessage { id, content }));
1520        cx.notify();
1521    }
1522
1523    pub fn push_acp_agent_block(&mut self, block: acp::ContentBlock, cx: &mut Context<Self>) {
1524        let text = match block {
1525            acp::ContentBlock::Text(text_content) => text_content.text,
1526            acp::ContentBlock::Image(_) => "[image]".to_string(),
1527            acp::ContentBlock::Audio(_) => "[audio]".to_string(),
1528            acp::ContentBlock::ResourceLink(resource_link) => resource_link.uri,
1529            acp::ContentBlock::Resource(resource) => match resource.resource {
1530                acp::EmbeddedResourceResource::TextResourceContents(resource) => resource.uri,
1531                acp::EmbeddedResourceResource::BlobResourceContents(resource) => resource.uri,
1532                _ => "[resource]".to_string(),
1533            },
1534            _ => "[unknown]".to_string(),
1535        };
1536
1537        self.messages.push(Message::Agent(AgentMessage {
1538            content: vec![AgentMessageContent::Text(text)],
1539            ..Default::default()
1540        }));
1541        cx.notify();
1542    }
1543
1544    #[cfg(feature = "eval")]
1545    pub fn proceed(
1546        &mut self,
1547        cx: &mut Context<Self>,
1548    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1549        self.run_turn(cx)
1550    }
1551
1552    fn run_turn(
1553        &mut self,
1554        cx: &mut Context<Self>,
1555    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1556        // Flush the old pending message synchronously before cancelling,
1557        // to avoid a race where the detached cancel task might flush the NEW
1558        // turn's pending message instead of the old one.
1559        self.flush_pending_message(cx);
1560        self.cancel(cx).detach();
1561
1562        let model = self.model.clone().context("No language model configured")?;
1563        let profile = AgentSettings::get_global(cx)
1564            .profiles
1565            .get(&self.profile_id)
1566            .context("Profile not found")?;
1567        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1568        let event_stream = ThreadEventStream(events_tx);
1569        let message_ix = self.messages.len().saturating_sub(1);
1570        self.clear_summary();
1571        let (cancellation_tx, mut cancellation_rx) = watch::channel(false);
1572        self.running_turn = Some(RunningTurn {
1573            event_stream: event_stream.clone(),
1574            tools: self.enabled_tools(profile, &model, cx),
1575            cancellation_tx,
1576            _task: cx.spawn(async move |this, cx| {
1577                log::debug!("Starting agent turn execution");
1578
1579                let turn_result = Self::run_turn_internal(
1580                    &this,
1581                    model,
1582                    &event_stream,
1583                    cancellation_rx.clone(),
1584                    cx,
1585                )
1586                .await;
1587
1588                // Check if we were cancelled - if so, cancel() already took running_turn
1589                // and we shouldn't touch it (it might be a NEW turn now)
1590                let was_cancelled = *cancellation_rx.borrow();
1591                if was_cancelled {
1592                    log::debug!("Turn was cancelled, skipping cleanup");
1593                    return;
1594                }
1595
1596                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1597
1598                match turn_result {
1599                    Ok(()) => {
1600                        log::debug!("Turn execution completed");
1601                        event_stream.send_stop(acp::StopReason::EndTurn);
1602                    }
1603                    Err(error) => {
1604                        log::error!("Turn execution failed: {:?}", error);
1605                        match error.downcast::<CompletionError>() {
1606                            Ok(CompletionError::Refusal) => {
1607                                event_stream.send_stop(acp::StopReason::Refusal);
1608                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1609                            }
1610                            Ok(CompletionError::MaxTokens) => {
1611                                event_stream.send_stop(acp::StopReason::MaxTokens);
1612                            }
1613                            Ok(CompletionError::Other(error)) | Err(error) => {
1614                                event_stream.send_error(error);
1615                            }
1616                        }
1617                    }
1618                }
1619
1620                _ = this.update(cx, |this, _| this.running_turn.take());
1621            }),
1622        });
1623        Ok(events_rx)
1624    }
1625
1626    async fn run_turn_internal(
1627        this: &WeakEntity<Self>,
1628        model: Arc<dyn LanguageModel>,
1629        event_stream: &ThreadEventStream,
1630        mut cancellation_rx: watch::Receiver<bool>,
1631        cx: &mut AsyncApp,
1632    ) -> Result<()> {
1633        let mut attempt = 0;
1634        let mut intent = CompletionIntent::UserPrompt;
1635        loop {
1636            let request =
1637                this.update(cx, |this, cx| this.build_completion_request(intent, cx))??;
1638
1639            telemetry::event!(
1640                "Agent Thread Completion",
1641                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1642                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1643                model = model.telemetry_id(),
1644                model_provider = model.provider_id().to_string(),
1645                attempt
1646            );
1647
1648            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1649
1650            let (mut events, mut error) = match model.stream_completion(request, cx).await {
1651                Ok(events) => (events.fuse(), None),
1652                Err(err) => (stream::empty().boxed().fuse(), Some(err)),
1653            };
1654            let mut tool_results = FuturesUnordered::new();
1655            let mut cancelled = false;
1656            loop {
1657                // Race between getting the first event and cancellation
1658                let first_event = futures::select! {
1659                    event = events.next().fuse() => event,
1660                    _ = cancellation_rx.changed().fuse() => {
1661                        if *cancellation_rx.borrow() {
1662                            cancelled = true;
1663                            break;
1664                        }
1665                        continue;
1666                    }
1667                };
1668                let Some(first_event) = first_event else {
1669                    break;
1670                };
1671
1672                // Collect all immediately available events to process as a batch
1673                let mut batch = vec![first_event];
1674                while let Some(event) = events.next().now_or_never().flatten() {
1675                    batch.push(event);
1676                }
1677
1678                // Process the batch in a single update
1679                let batch_result = this.update(cx, |this, cx| {
1680                    let mut batch_tool_results = Vec::new();
1681                    let mut batch_error = None;
1682
1683                    for event in batch {
1684                        log::trace!("Received completion event: {:?}", event);
1685                        match event {
1686                            Ok(event) => {
1687                                match this.handle_completion_event(
1688                                    event,
1689                                    event_stream,
1690                                    cancellation_rx.clone(),
1691                                    cx,
1692                                ) {
1693                                    Ok(Some(task)) => batch_tool_results.push(task),
1694                                    Ok(None) => {}
1695                                    Err(err) => {
1696                                        batch_error = Some(err);
1697                                        break;
1698                                    }
1699                                }
1700                            }
1701                            Err(err) => {
1702                                batch_error = Some(err.into());
1703                                break;
1704                            }
1705                        }
1706                    }
1707
1708                    cx.notify();
1709                    (batch_tool_results, batch_error)
1710                })?;
1711
1712                tool_results.extend(batch_result.0);
1713                if let Some(err) = batch_result.1 {
1714                    error = Some(err.downcast()?);
1715                    break;
1716                }
1717            }
1718
1719            // Drop the stream to release the rate limit permit before tool execution.
1720            // The stream holds a semaphore guard that limits concurrent requests.
1721            // Without this, the permit would be held during potentially long-running
1722            // tool execution, which could cause deadlocks when tools spawn subagents
1723            // that need their own permits.
1724            drop(events);
1725
1726            let end_turn = tool_results.is_empty();
1727            while let Some(tool_result) = tool_results.next().await {
1728                log::debug!("Tool finished {:?}", tool_result);
1729
1730                event_stream.update_tool_call_fields(
1731                    &tool_result.tool_use_id,
1732                    acp::ToolCallUpdateFields::new()
1733                        .status(if tool_result.is_error {
1734                            acp::ToolCallStatus::Failed
1735                        } else {
1736                            acp::ToolCallStatus::Completed
1737                        })
1738                        .raw_output(tool_result.output.clone()),
1739                );
1740                this.update(cx, |this, _cx| {
1741                    this.pending_message()
1742                        .tool_results
1743                        .insert(tool_result.tool_use_id.clone(), tool_result);
1744                })?;
1745            }
1746
1747            this.update(cx, |this, cx| {
1748                this.flush_pending_message(cx);
1749                if this.title.is_none() && this.pending_title_generation.is_none() {
1750                    this.generate_title(cx);
1751                }
1752            })?;
1753
1754            if cancelled {
1755                log::debug!("Turn cancelled by user, exiting");
1756                return Ok(());
1757            }
1758
1759            if let Some(error) = error {
1760                attempt += 1;
1761                let retry = this.update(cx, |this, cx| {
1762                    let user_store = this.user_store.read(cx);
1763                    this.handle_completion_error(error, attempt, user_store.plan())
1764                })??;
1765                let timer = cx.background_executor().timer(retry.duration);
1766                event_stream.send_retry(retry);
1767                timer.await;
1768                this.update(cx, |this, _cx| {
1769                    if let Some(Message::Agent(message)) = this.messages.last() {
1770                        if message.tool_results.is_empty() {
1771                            intent = CompletionIntent::UserPrompt;
1772                            this.messages.push(Message::Resume);
1773                        }
1774                    }
1775                })?;
1776            } else if end_turn {
1777                return Ok(());
1778            } else {
1779                let has_queued = this.update(cx, |this, _| this.has_queued_message())?;
1780                if has_queued {
1781                    log::debug!("Queued message found, ending turn at message boundary");
1782                    return Ok(());
1783                }
1784                intent = CompletionIntent::ToolResults;
1785                attempt = 0;
1786            }
1787        }
1788    }
1789
1790    fn handle_completion_error(
1791        &mut self,
1792        error: LanguageModelCompletionError,
1793        attempt: u8,
1794        plan: Option<Plan>,
1795    ) -> Result<acp_thread::RetryStatus> {
1796        let Some(model) = self.model.as_ref() else {
1797            return Err(anyhow!(error));
1798        };
1799
1800        let auto_retry = if model.provider_id() == ZED_CLOUD_PROVIDER_ID {
1801            plan.is_some()
1802        } else {
1803            true
1804        };
1805
1806        if !auto_retry {
1807            return Err(anyhow!(error));
1808        }
1809
1810        let Some(strategy) = Self::retry_strategy_for(&error) else {
1811            return Err(anyhow!(error));
1812        };
1813
1814        let max_attempts = match &strategy {
1815            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1816            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1817        };
1818
1819        if attempt > max_attempts {
1820            return Err(anyhow!(error));
1821        }
1822
1823        let delay = match &strategy {
1824            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1825                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1826                Duration::from_secs(delay_secs)
1827            }
1828            RetryStrategy::Fixed { delay, .. } => *delay,
1829        };
1830        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1831
1832        Ok(acp_thread::RetryStatus {
1833            last_error: error.to_string().into(),
1834            attempt: attempt as usize,
1835            max_attempts: max_attempts as usize,
1836            started_at: Instant::now(),
1837            duration: delay,
1838        })
1839    }
1840
1841    /// A helper method that's called on every streamed completion event.
1842    /// Returns an optional tool result task, which the main agentic loop will
1843    /// send back to the model when it resolves.
1844    fn handle_completion_event(
1845        &mut self,
1846        event: LanguageModelCompletionEvent,
1847        event_stream: &ThreadEventStream,
1848        cancellation_rx: watch::Receiver<bool>,
1849        cx: &mut Context<Self>,
1850    ) -> Result<Option<Task<LanguageModelToolResult>>> {
1851        log::trace!("Handling streamed completion event: {:?}", event);
1852        use LanguageModelCompletionEvent::*;
1853
1854        match event {
1855            StartMessage { .. } => {
1856                self.flush_pending_message(cx);
1857                self.pending_message = Some(AgentMessage::default());
1858            }
1859            Text(new_text) => self.handle_text_event(new_text, event_stream),
1860            Thinking { text, signature } => {
1861                self.handle_thinking_event(text, signature, event_stream)
1862            }
1863            RedactedThinking { data } => self.handle_redacted_thinking_event(data),
1864            ReasoningDetails(details) => {
1865                let last_message = self.pending_message();
1866                // Store the last non-empty reasoning_details (overwrites earlier ones)
1867                // This ensures we keep the encrypted reasoning with signatures, not the early text reasoning
1868                if let serde_json::Value::Array(ref arr) = details {
1869                    if !arr.is_empty() {
1870                        last_message.reasoning_details = Some(details);
1871                    }
1872                } else {
1873                    last_message.reasoning_details = Some(details);
1874                }
1875            }
1876            ToolUse(tool_use) => {
1877                return Ok(self.handle_tool_use_event(tool_use, event_stream, cancellation_rx, cx));
1878            }
1879            ToolUseJsonParseError {
1880                id,
1881                tool_name,
1882                raw_input,
1883                json_parse_error,
1884            } => {
1885                return Ok(Some(Task::ready(
1886                    self.handle_tool_use_json_parse_error_event(
1887                        id,
1888                        tool_name,
1889                        raw_input,
1890                        json_parse_error,
1891                    ),
1892                )));
1893            }
1894            UsageUpdate(usage) => {
1895                telemetry::event!(
1896                    "Agent Thread Completion Usage Updated",
1897                    thread_id = self.id.to_string(),
1898                    prompt_id = self.prompt_id.to_string(),
1899                    model = self.model.as_ref().map(|m| m.telemetry_id()),
1900                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
1901                    input_tokens = usage.input_tokens,
1902                    output_tokens = usage.output_tokens,
1903                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
1904                    cache_read_input_tokens = usage.cache_read_input_tokens,
1905                );
1906                self.update_token_usage(usage, cx);
1907            }
1908            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
1909            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
1910            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
1911            Started | Queued { .. } => {}
1912        }
1913
1914        Ok(None)
1915    }
1916
1917    fn handle_text_event(&mut self, new_text: String, event_stream: &ThreadEventStream) {
1918        event_stream.send_text(&new_text);
1919
1920        let last_message = self.pending_message();
1921        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1922            text.push_str(&new_text);
1923        } else {
1924            last_message
1925                .content
1926                .push(AgentMessageContent::Text(new_text));
1927        }
1928    }
1929
1930    fn handle_thinking_event(
1931        &mut self,
1932        new_text: String,
1933        new_signature: Option<String>,
1934        event_stream: &ThreadEventStream,
1935    ) {
1936        event_stream.send_thinking(&new_text);
1937
1938        let last_message = self.pending_message();
1939        if let Some(AgentMessageContent::Thinking { text, signature }) =
1940            last_message.content.last_mut()
1941        {
1942            text.push_str(&new_text);
1943            *signature = new_signature.or(signature.take());
1944        } else {
1945            last_message.content.push(AgentMessageContent::Thinking {
1946                text: new_text,
1947                signature: new_signature,
1948            });
1949        }
1950    }
1951
1952    fn handle_redacted_thinking_event(&mut self, data: String) {
1953        let last_message = self.pending_message();
1954        last_message
1955            .content
1956            .push(AgentMessageContent::RedactedThinking(data));
1957    }
1958
1959    fn handle_tool_use_event(
1960        &mut self,
1961        tool_use: LanguageModelToolUse,
1962        event_stream: &ThreadEventStream,
1963        cancellation_rx: watch::Receiver<bool>,
1964        cx: &mut Context<Self>,
1965    ) -> Option<Task<LanguageModelToolResult>> {
1966        cx.notify();
1967
1968        let tool = self.tool(tool_use.name.as_ref());
1969        let mut title = SharedString::from(&tool_use.name);
1970        let mut kind = acp::ToolKind::Other;
1971        if let Some(tool) = tool.as_ref() {
1972            title = tool.initial_title(tool_use.input.clone(), cx);
1973            kind = tool.kind();
1974        }
1975
1976        // Ensure the last message ends in the current tool use
1977        let last_message = self.pending_message();
1978        let push_new_tool_use = last_message.content.last_mut().is_none_or(|content| {
1979            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1980                if last_tool_use.id == tool_use.id {
1981                    *last_tool_use = tool_use.clone();
1982                    false
1983                } else {
1984                    true
1985                }
1986            } else {
1987                true
1988            }
1989        });
1990
1991        if push_new_tool_use {
1992            event_stream.send_tool_call(
1993                &tool_use.id,
1994                &tool_use.name,
1995                title,
1996                kind,
1997                tool_use.input.clone(),
1998            );
1999            last_message
2000                .content
2001                .push(AgentMessageContent::ToolUse(tool_use.clone()));
2002        } else {
2003            event_stream.update_tool_call_fields(
2004                &tool_use.id,
2005                acp::ToolCallUpdateFields::new()
2006                    .title(title.as_str())
2007                    .kind(kind)
2008                    .raw_input(tool_use.input.clone()),
2009            );
2010        }
2011
2012        if !tool_use.is_input_complete {
2013            return None;
2014        }
2015
2016        let Some(tool) = tool else {
2017            let content = format!("No tool named {} exists", tool_use.name);
2018            return Some(Task::ready(LanguageModelToolResult {
2019                content: LanguageModelToolResultContent::Text(Arc::from(content)),
2020                tool_use_id: tool_use.id,
2021                tool_name: tool_use.name,
2022                is_error: true,
2023                output: None,
2024            }));
2025        };
2026
2027        let fs = self.project.read(cx).fs().clone();
2028        let tool_event_stream = ToolCallEventStream::new(
2029            tool_use.id.clone(),
2030            event_stream.clone(),
2031            Some(fs),
2032            cancellation_rx,
2033        );
2034        tool_event_stream.update_fields(
2035            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress),
2036        );
2037        let supports_images = self.model().is_some_and(|model| model.supports_images());
2038        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
2039        log::debug!("Running tool {}", tool_use.name);
2040        Some(cx.foreground_executor().spawn(async move {
2041            let tool_result = tool_result.await.and_then(|output| {
2042                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
2043                    && !supports_images
2044                {
2045                    return Err(anyhow!(
2046                        "Attempted to read an image, but this model doesn't support it.",
2047                    ));
2048                }
2049                Ok(output)
2050            });
2051
2052            match tool_result {
2053                Ok(output) => LanguageModelToolResult {
2054                    tool_use_id: tool_use.id,
2055                    tool_name: tool_use.name,
2056                    is_error: false,
2057                    content: output.llm_output,
2058                    output: Some(output.raw_output),
2059                },
2060                Err(error) => LanguageModelToolResult {
2061                    tool_use_id: tool_use.id,
2062                    tool_name: tool_use.name,
2063                    is_error: true,
2064                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
2065                    output: Some(error.to_string().into()),
2066                },
2067            }
2068        }))
2069    }
2070
2071    fn handle_tool_use_json_parse_error_event(
2072        &mut self,
2073        tool_use_id: LanguageModelToolUseId,
2074        tool_name: Arc<str>,
2075        raw_input: Arc<str>,
2076        json_parse_error: String,
2077    ) -> LanguageModelToolResult {
2078        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
2079        LanguageModelToolResult {
2080            tool_use_id,
2081            tool_name,
2082            is_error: true,
2083            content: LanguageModelToolResultContent::Text(tool_output.into()),
2084            output: Some(serde_json::Value::String(raw_input.to_string())),
2085        }
2086    }
2087
2088    pub fn title(&self) -> SharedString {
2089        self.title.clone().unwrap_or("New Thread".into())
2090    }
2091
2092    pub fn is_generating_summary(&self) -> bool {
2093        self.pending_summary_generation.is_some()
2094    }
2095
2096    pub fn is_generating_title(&self) -> bool {
2097        self.pending_title_generation.is_some()
2098    }
2099
2100    pub fn summary(&mut self, cx: &mut Context<Self>) -> Shared<Task<Option<SharedString>>> {
2101        if let Some(summary) = self.summary.as_ref() {
2102            return Task::ready(Some(summary.clone())).shared();
2103        }
2104        if let Some(task) = self.pending_summary_generation.clone() {
2105            return task;
2106        }
2107        let Some(model) = self.summarization_model.clone() else {
2108            log::error!("No summarization model available");
2109            return Task::ready(None).shared();
2110        };
2111        let mut request = LanguageModelRequest {
2112            intent: Some(CompletionIntent::ThreadContextSummarization),
2113            temperature: AgentSettings::temperature_for_model(&model, cx),
2114            ..Default::default()
2115        };
2116
2117        for message in &self.messages {
2118            request.messages.extend(message.to_request());
2119        }
2120
2121        request.messages.push(LanguageModelRequestMessage {
2122            role: Role::User,
2123            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
2124            cache: false,
2125            reasoning_details: None,
2126        });
2127
2128        let task = cx
2129            .spawn(async move |this, cx| {
2130                let mut summary = String::new();
2131                let mut messages = model.stream_completion(request, cx).await.log_err()?;
2132                while let Some(event) = messages.next().await {
2133                    let event = event.log_err()?;
2134                    let text = match event {
2135                        LanguageModelCompletionEvent::Text(text) => text,
2136                        _ => continue,
2137                    };
2138
2139                    let mut lines = text.lines();
2140                    summary.extend(lines.next());
2141                }
2142
2143                log::debug!("Setting summary: {}", summary);
2144                let summary = SharedString::from(summary);
2145
2146                this.update(cx, |this, cx| {
2147                    this.summary = Some(summary.clone());
2148                    this.pending_summary_generation = None;
2149                    cx.notify()
2150                })
2151                .ok()?;
2152
2153                Some(summary)
2154            })
2155            .shared();
2156        self.pending_summary_generation = Some(task.clone());
2157        task
2158    }
2159
2160    pub fn generate_title(&mut self, cx: &mut Context<Self>) {
2161        let Some(model) = self.summarization_model.clone() else {
2162            return;
2163        };
2164
2165        log::debug!(
2166            "Generating title with model: {:?}",
2167            self.summarization_model.as_ref().map(|model| model.name())
2168        );
2169        let mut request = LanguageModelRequest {
2170            intent: Some(CompletionIntent::ThreadSummarization),
2171            temperature: AgentSettings::temperature_for_model(&model, cx),
2172            ..Default::default()
2173        };
2174
2175        for message in &self.messages {
2176            request.messages.extend(message.to_request());
2177        }
2178
2179        request.messages.push(LanguageModelRequestMessage {
2180            role: Role::User,
2181            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
2182            cache: false,
2183            reasoning_details: None,
2184        });
2185        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
2186            let mut title = String::new();
2187
2188            let generate = async {
2189                let mut messages = model.stream_completion(request, cx).await?;
2190                while let Some(event) = messages.next().await {
2191                    let event = event?;
2192                    let text = match event {
2193                        LanguageModelCompletionEvent::Text(text) => text,
2194                        _ => continue,
2195                    };
2196
2197                    let mut lines = text.lines();
2198                    title.extend(lines.next());
2199
2200                    // Stop if the LLM generated multiple lines.
2201                    if lines.next().is_some() {
2202                        break;
2203                    }
2204                }
2205                anyhow::Ok(())
2206            };
2207
2208            if generate.await.context("failed to generate title").is_ok() {
2209                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
2210            }
2211            _ = this.update(cx, |this, _| this.pending_title_generation = None);
2212        }));
2213    }
2214
2215    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
2216        self.pending_title_generation = None;
2217        if Some(&title) != self.title.as_ref() {
2218            self.title = Some(title);
2219            cx.emit(TitleUpdated);
2220            cx.notify();
2221        }
2222    }
2223
2224    fn clear_summary(&mut self) {
2225        self.summary = None;
2226        self.pending_summary_generation = None;
2227    }
2228
2229    fn last_user_message(&self) -> Option<&UserMessage> {
2230        self.messages
2231            .iter()
2232            .rev()
2233            .find_map(|message| match message {
2234                Message::User(user_message) => Some(user_message),
2235                Message::Agent(_) => None,
2236                Message::Resume => None,
2237            })
2238    }
2239
2240    fn pending_message(&mut self) -> &mut AgentMessage {
2241        self.pending_message.get_or_insert_default()
2242    }
2243
2244    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
2245        let Some(mut message) = self.pending_message.take() else {
2246            return;
2247        };
2248
2249        if message.content.is_empty() {
2250            return;
2251        }
2252
2253        for content in &message.content {
2254            let AgentMessageContent::ToolUse(tool_use) = content else {
2255                continue;
2256            };
2257
2258            if !message.tool_results.contains_key(&tool_use.id) {
2259                message.tool_results.insert(
2260                    tool_use.id.clone(),
2261                    LanguageModelToolResult {
2262                        tool_use_id: tool_use.id.clone(),
2263                        tool_name: tool_use.name.clone(),
2264                        is_error: true,
2265                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
2266                        output: None,
2267                    },
2268                );
2269            }
2270        }
2271
2272        self.messages.push(Message::Agent(message));
2273        self.updated_at = Utc::now();
2274        self.clear_summary();
2275        cx.notify()
2276    }
2277
2278    pub(crate) fn build_completion_request(
2279        &self,
2280        completion_intent: CompletionIntent,
2281        cx: &App,
2282    ) -> Result<LanguageModelRequest> {
2283        let model = self.model().context("No language model configured")?;
2284        let tools = if let Some(turn) = self.running_turn.as_ref() {
2285            turn.tools
2286                .iter()
2287                .filter_map(|(tool_name, tool)| {
2288                    log::trace!("Including tool: {}", tool_name);
2289                    Some(LanguageModelRequestTool {
2290                        name: tool_name.to_string(),
2291                        description: tool.description().to_string(),
2292                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
2293                    })
2294                })
2295                .collect::<Vec<_>>()
2296        } else {
2297            Vec::new()
2298        };
2299
2300        log::debug!("Building completion request");
2301        log::debug!("Completion intent: {:?}", completion_intent);
2302
2303        let available_tools: Vec<_> = self
2304            .running_turn
2305            .as_ref()
2306            .map(|turn| turn.tools.keys().cloned().collect())
2307            .unwrap_or_default();
2308
2309        log::debug!("Request includes {} tools", available_tools.len());
2310        let messages = self.build_request_messages(available_tools, cx);
2311        log::debug!("Request will include {} messages", messages.len());
2312
2313        let request = LanguageModelRequest {
2314            thread_id: Some(self.id.to_string()),
2315            prompt_id: Some(self.prompt_id.to_string()),
2316            intent: Some(completion_intent),
2317            messages,
2318            tools,
2319            tool_choice: None,
2320            stop: Vec::new(),
2321            temperature: AgentSettings::temperature_for_model(model, cx),
2322            thinking_allowed: self.thinking_enabled,
2323        };
2324
2325        log::debug!("Completion request built successfully");
2326        Ok(request)
2327    }
2328
2329    fn enabled_tools(
2330        &self,
2331        profile: &AgentProfileSettings,
2332        model: &Arc<dyn LanguageModel>,
2333        cx: &App,
2334    ) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
2335        fn truncate(tool_name: &SharedString) -> SharedString {
2336            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
2337                let mut truncated = tool_name.to_string();
2338                truncated.truncate(MAX_TOOL_NAME_LENGTH);
2339                truncated.into()
2340            } else {
2341                tool_name.clone()
2342            }
2343        }
2344
2345        let use_streaming_edit_tool = false;
2346
2347        let mut tools = self
2348            .tools
2349            .iter()
2350            .filter_map(|(tool_name, tool)| {
2351                // For streaming_edit_file, check profile against "edit_file" since that's what users configure
2352                let profile_tool_name = if tool_name == "streaming_edit_file" {
2353                    "edit_file"
2354                } else {
2355                    tool_name.as_ref()
2356                };
2357
2358                if tool.supports_provider(&model.provider_id())
2359                    && profile.is_tool_enabled(profile_tool_name)
2360                {
2361                    match (tool_name.as_ref(), use_streaming_edit_tool) {
2362                        ("streaming_edit_file", false) | ("edit_file", true) => None,
2363                        ("streaming_edit_file", true) => {
2364                            // Expose streaming tool as "edit_file"
2365                            Some((SharedString::from("edit_file"), tool.clone()))
2366                        }
2367                        _ => Some((truncate(tool_name), tool.clone())),
2368                    }
2369                } else {
2370                    None
2371                }
2372            })
2373            .collect::<BTreeMap<_, _>>();
2374
2375        let mut context_server_tools = Vec::new();
2376        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
2377        let mut duplicate_tool_names = HashSet::default();
2378        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
2379            for (tool_name, tool) in server_tools {
2380                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
2381                    let tool_name = truncate(tool_name);
2382                    if !seen_tools.insert(tool_name.clone()) {
2383                        duplicate_tool_names.insert(tool_name.clone());
2384                    }
2385                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
2386                }
2387            }
2388        }
2389
2390        // When there are duplicate tool names, disambiguate by prefixing them
2391        // with the server ID. In the rare case there isn't enough space for the
2392        // disambiguated tool name, keep only the last tool with this name.
2393        for (server_id, tool_name, tool) in context_server_tools {
2394            if duplicate_tool_names.contains(&tool_name) {
2395                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
2396                if available >= 2 {
2397                    let mut disambiguated = server_id.0.to_string();
2398                    disambiguated.truncate(available - 1);
2399                    disambiguated.push('_');
2400                    disambiguated.push_str(&tool_name);
2401                    tools.insert(disambiguated.into(), tool.clone());
2402                } else {
2403                    tools.insert(tool_name, tool.clone());
2404                }
2405            } else {
2406                tools.insert(tool_name, tool.clone());
2407            }
2408        }
2409
2410        tools
2411    }
2412
2413    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
2414        self.running_turn.as_ref()?.tools.get(name).cloned()
2415    }
2416
2417    pub fn has_tool(&self, name: &str) -> bool {
2418        self.running_turn
2419            .as_ref()
2420            .is_some_and(|turn| turn.tools.contains_key(name))
2421    }
2422
2423    #[cfg(any(test, feature = "test-support"))]
2424    pub fn has_registered_tool(&self, name: &str) -> bool {
2425        self.tools.contains_key(name)
2426    }
2427
2428    pub fn registered_tool_names(&self) -> Vec<SharedString> {
2429        self.tools.keys().cloned().collect()
2430    }
2431
2432    pub fn register_running_subagent(&mut self, subagent: WeakEntity<Thread>) {
2433        self.running_subagents.push(subagent);
2434    }
2435
2436    pub fn unregister_running_subagent(&mut self, subagent: &WeakEntity<Thread>) {
2437        self.running_subagents
2438            .retain(|s| s.entity_id() != subagent.entity_id());
2439    }
2440
2441    pub fn running_subagent_count(&self) -> usize {
2442        self.running_subagents
2443            .iter()
2444            .filter(|s| s.upgrade().is_some())
2445            .count()
2446    }
2447
2448    pub fn is_subagent(&self) -> bool {
2449        self.subagent_context.is_some()
2450    }
2451
2452    pub fn depth(&self) -> u8 {
2453        self.subagent_context.as_ref().map(|c| c.depth).unwrap_or(0)
2454    }
2455
2456    pub fn is_turn_complete(&self) -> bool {
2457        self.running_turn.is_none()
2458    }
2459
2460    pub fn submit_user_message(
2461        &mut self,
2462        content: impl Into<String>,
2463        cx: &mut Context<Self>,
2464    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
2465        let content = content.into();
2466        self.messages.push(Message::User(UserMessage {
2467            id: UserMessageId::new(),
2468            content: vec![UserMessageContent::Text(content)],
2469        }));
2470        cx.notify();
2471        self.send_existing(cx)
2472    }
2473
2474    pub fn interrupt_for_summary(
2475        &mut self,
2476        cx: &mut Context<Self>,
2477    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
2478        let context = self
2479            .subagent_context
2480            .as_ref()
2481            .context("Not a subagent thread")?;
2482        let prompt = context.context_low_prompt.clone();
2483        self.cancel(cx).detach();
2484        self.submit_user_message(prompt, cx)
2485    }
2486
2487    pub fn request_final_summary(
2488        &mut self,
2489        cx: &mut Context<Self>,
2490    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
2491        let context = self
2492            .subagent_context
2493            .as_ref()
2494            .context("Not a subagent thread")?;
2495        let prompt = context.summary_prompt.clone();
2496        self.submit_user_message(prompt, cx)
2497    }
2498
2499    fn build_request_messages(
2500        &self,
2501        available_tools: Vec<SharedString>,
2502        cx: &App,
2503    ) -> Vec<LanguageModelRequestMessage> {
2504        log::trace!(
2505            "Building request messages from {} thread messages",
2506            self.messages.len()
2507        );
2508
2509        let system_prompt = SystemPromptTemplate {
2510            project: self.project_context.read(cx),
2511            available_tools,
2512            model_name: self.model.as_ref().map(|m| m.name().0.to_string()),
2513        }
2514        .render(&self.templates)
2515        .context("failed to build system prompt")
2516        .expect("Invalid template");
2517        let mut messages = vec![LanguageModelRequestMessage {
2518            role: Role::System,
2519            content: vec![system_prompt.into()],
2520            cache: false,
2521            reasoning_details: None,
2522        }];
2523        for message in &self.messages {
2524            messages.extend(message.to_request());
2525        }
2526
2527        if let Some(last_message) = messages.last_mut() {
2528            last_message.cache = true;
2529        }
2530
2531        if let Some(message) = self.pending_message.as_ref() {
2532            messages.extend(message.to_request());
2533        }
2534
2535        messages
2536    }
2537
2538    pub fn to_markdown(&self) -> String {
2539        let mut markdown = String::new();
2540        for (ix, message) in self.messages.iter().enumerate() {
2541            if ix > 0 {
2542                markdown.push('\n');
2543            }
2544            markdown.push_str(&message.to_markdown());
2545        }
2546
2547        if let Some(message) = self.pending_message.as_ref() {
2548            markdown.push('\n');
2549            markdown.push_str(&message.to_markdown());
2550        }
2551
2552        markdown
2553    }
2554
2555    fn advance_prompt_id(&mut self) {
2556        self.prompt_id = PromptId::new();
2557    }
2558
2559    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
2560        use LanguageModelCompletionError::*;
2561        use http_client::StatusCode;
2562
2563        // General strategy here:
2564        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
2565        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
2566        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
2567        match error {
2568            HttpResponseError {
2569                status_code: StatusCode::TOO_MANY_REQUESTS,
2570                ..
2571            } => Some(RetryStrategy::ExponentialBackoff {
2572                initial_delay: BASE_RETRY_DELAY,
2573                max_attempts: MAX_RETRY_ATTEMPTS,
2574            }),
2575            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
2576                Some(RetryStrategy::Fixed {
2577                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2578                    max_attempts: MAX_RETRY_ATTEMPTS,
2579                })
2580            }
2581            UpstreamProviderError {
2582                status,
2583                retry_after,
2584                ..
2585            } => match *status {
2586                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
2587                    Some(RetryStrategy::Fixed {
2588                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2589                        max_attempts: MAX_RETRY_ATTEMPTS,
2590                    })
2591                }
2592                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
2593                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2594                    // Internal Server Error could be anything, retry up to 3 times.
2595                    max_attempts: 3,
2596                }),
2597                status => {
2598                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
2599                    // but we frequently get them in practice. See https://http.dev/529
2600                    if status.as_u16() == 529 {
2601                        Some(RetryStrategy::Fixed {
2602                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2603                            max_attempts: MAX_RETRY_ATTEMPTS,
2604                        })
2605                    } else {
2606                        Some(RetryStrategy::Fixed {
2607                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2608                            max_attempts: 2,
2609                        })
2610                    }
2611                }
2612            },
2613            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
2614                delay: BASE_RETRY_DELAY,
2615                max_attempts: 3,
2616            }),
2617            ApiReadResponseError { .. }
2618            | HttpSend { .. }
2619            | DeserializeResponse { .. }
2620            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
2621                delay: BASE_RETRY_DELAY,
2622                max_attempts: 3,
2623            }),
2624            // Retrying these errors definitely shouldn't help.
2625            HttpResponseError {
2626                status_code:
2627                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
2628                ..
2629            }
2630            | AuthenticationError { .. }
2631            | PermissionError { .. }
2632            | NoApiKey { .. }
2633            | ApiEndpointNotFound { .. }
2634            | PromptTooLarge { .. } => None,
2635            // These errors might be transient, so retry them
2636            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
2637                delay: BASE_RETRY_DELAY,
2638                max_attempts: 1,
2639            }),
2640            // Retry all other 4xx and 5xx errors once.
2641            HttpResponseError { status_code, .. }
2642                if status_code.is_client_error() || status_code.is_server_error() =>
2643            {
2644                Some(RetryStrategy::Fixed {
2645                    delay: BASE_RETRY_DELAY,
2646                    max_attempts: 3,
2647                })
2648            }
2649            Other(err) if err.is::<language_model::PaymentRequiredError>() => {
2650                // Retrying won't help for Payment Required errors.
2651                None
2652            }
2653            // Conservatively assume that any other errors are non-retryable
2654            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
2655                delay: BASE_RETRY_DELAY,
2656                max_attempts: 2,
2657            }),
2658        }
2659    }
2660}
2661
2662struct RunningTurn {
2663    /// Holds the task that handles agent interaction until the end of the turn.
2664    /// Survives across multiple requests as the model performs tool calls and
2665    /// we run tools, report their results.
2666    _task: Task<()>,
2667    /// The current event stream for the running turn. Used to report a final
2668    /// cancellation event if we cancel the turn.
2669    event_stream: ThreadEventStream,
2670    /// The tools that were enabled for this turn.
2671    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
2672    /// Sender to signal tool cancellation. When cancel is called, this is
2673    /// set to true so all tools can detect user-initiated cancellation.
2674    cancellation_tx: watch::Sender<bool>,
2675}
2676
2677impl RunningTurn {
2678    fn cancel(mut self) -> Task<()> {
2679        log::debug!("Cancelling in progress turn");
2680        self.cancellation_tx.send(true).ok();
2681        self.event_stream.send_canceled();
2682        self._task
2683    }
2684}
2685
2686pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
2687
2688impl EventEmitter<TokenUsageUpdated> for Thread {}
2689
2690pub struct TitleUpdated;
2691
2692impl EventEmitter<TitleUpdated> for Thread {}
2693
2694pub trait AgentTool
2695where
2696    Self: 'static + Sized,
2697{
2698    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
2699    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
2700
2701    fn name() -> &'static str;
2702
2703    fn description() -> SharedString {
2704        let schema = schemars::schema_for!(Self::Input);
2705        SharedString::new(
2706            schema
2707                .get("description")
2708                .and_then(|description| description.as_str())
2709                .unwrap_or_default(),
2710        )
2711    }
2712
2713    fn kind() -> acp::ToolKind;
2714
2715    /// The initial tool title to display. Can be updated during the tool run.
2716    fn initial_title(
2717        &self,
2718        input: Result<Self::Input, serde_json::Value>,
2719        cx: &mut App,
2720    ) -> SharedString;
2721
2722    /// Returns the JSON schema that describes the tool's input.
2723    fn input_schema(format: LanguageModelToolSchemaFormat) -> Schema {
2724        language_model::tool_schema::root_schema_for::<Self::Input>(format)
2725    }
2726
2727    /// Some tools rely on a provider for the underlying billing or other reasons.
2728    /// Allow the tool to check if they are compatible, or should be filtered out.
2729    fn supports_provider(_provider: &LanguageModelProviderId) -> bool {
2730        true
2731    }
2732
2733    /// Runs the tool with the provided input.
2734    fn run(
2735        self: Arc<Self>,
2736        input: Self::Input,
2737        event_stream: ToolCallEventStream,
2738        cx: &mut App,
2739    ) -> Task<Result<Self::Output>>;
2740
2741    /// Emits events for a previous execution of the tool.
2742    fn replay(
2743        &self,
2744        _input: Self::Input,
2745        _output: Self::Output,
2746        _event_stream: ToolCallEventStream,
2747        _cx: &mut App,
2748    ) -> Result<()> {
2749        Ok(())
2750    }
2751
2752    fn erase(self) -> Arc<dyn AnyAgentTool> {
2753        Arc::new(Erased(Arc::new(self)))
2754    }
2755
2756    /// Create a new instance of this tool bound to a different thread.
2757    /// This is used when creating subagents, so that tools like EditFileTool
2758    /// that hold a thread reference will use the subagent's thread instead
2759    /// of the parent's thread.
2760    /// Returns None if the tool doesn't need rebinding (most tools).
2761    fn rebind_thread(&self, _new_thread: WeakEntity<Thread>) -> Option<Arc<dyn AnyAgentTool>> {
2762        None
2763    }
2764}
2765
2766pub struct Erased<T>(T);
2767
2768pub struct AgentToolOutput {
2769    pub llm_output: LanguageModelToolResultContent,
2770    pub raw_output: serde_json::Value,
2771}
2772
2773pub trait AnyAgentTool {
2774    fn name(&self) -> SharedString;
2775    fn description(&self) -> SharedString;
2776    fn kind(&self) -> acp::ToolKind;
2777    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
2778    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
2779    fn supports_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2780        true
2781    }
2782    fn run(
2783        self: Arc<Self>,
2784        input: serde_json::Value,
2785        event_stream: ToolCallEventStream,
2786        cx: &mut App,
2787    ) -> Task<Result<AgentToolOutput>>;
2788    fn replay(
2789        &self,
2790        input: serde_json::Value,
2791        output: serde_json::Value,
2792        event_stream: ToolCallEventStream,
2793        cx: &mut App,
2794    ) -> Result<()>;
2795    /// Create a new instance of this tool bound to a different thread.
2796    /// This is used when creating subagents, so that tools like EditFileTool
2797    /// that hold a thread reference will use the subagent's thread instead
2798    /// of the parent's thread.
2799    /// Returns None if the tool doesn't need rebinding (most tools).
2800    fn rebind_thread(&self, _new_thread: WeakEntity<Thread>) -> Option<Arc<dyn AnyAgentTool>> {
2801        None
2802    }
2803}
2804
2805impl<T> AnyAgentTool for Erased<Arc<T>>
2806where
2807    T: AgentTool,
2808{
2809    fn name(&self) -> SharedString {
2810        T::name().into()
2811    }
2812
2813    fn description(&self) -> SharedString {
2814        T::description()
2815    }
2816
2817    fn kind(&self) -> agent_client_protocol::ToolKind {
2818        T::kind()
2819    }
2820
2821    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
2822        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
2823        self.0.initial_title(parsed_input, _cx)
2824    }
2825
2826    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
2827        let mut json = serde_json::to_value(T::input_schema(format))?;
2828        language_model::tool_schema::adapt_schema_to_format(&mut json, format)?;
2829        Ok(json)
2830    }
2831
2832    fn supports_provider(&self, provider: &LanguageModelProviderId) -> bool {
2833        T::supports_provider(provider)
2834    }
2835
2836    fn run(
2837        self: Arc<Self>,
2838        input: serde_json::Value,
2839        event_stream: ToolCallEventStream,
2840        cx: &mut App,
2841    ) -> Task<Result<AgentToolOutput>> {
2842        cx.spawn(async move |cx| {
2843            let input = serde_json::from_value(input)?;
2844            let output = cx
2845                .update(|cx| self.0.clone().run(input, event_stream, cx))
2846                .await?;
2847            let raw_output = serde_json::to_value(&output)?;
2848            Ok(AgentToolOutput {
2849                llm_output: output.into(),
2850                raw_output,
2851            })
2852        })
2853    }
2854
2855    fn replay(
2856        &self,
2857        input: serde_json::Value,
2858        output: serde_json::Value,
2859        event_stream: ToolCallEventStream,
2860        cx: &mut App,
2861    ) -> Result<()> {
2862        let input = serde_json::from_value(input)?;
2863        let output = serde_json::from_value(output)?;
2864        self.0.replay(input, output, event_stream, cx)
2865    }
2866
2867    fn rebind_thread(&self, new_thread: WeakEntity<Thread>) -> Option<Arc<dyn AnyAgentTool>> {
2868        self.0.rebind_thread(new_thread)
2869    }
2870}
2871
2872#[derive(Clone)]
2873struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
2874
2875impl ThreadEventStream {
2876    fn send_user_message(&self, message: &UserMessage) {
2877        self.0
2878            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
2879            .ok();
2880    }
2881
2882    fn send_text(&self, text: &str) {
2883        self.0
2884            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
2885            .ok();
2886    }
2887
2888    fn send_thinking(&self, text: &str) {
2889        self.0
2890            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
2891            .ok();
2892    }
2893
2894    fn send_tool_call(
2895        &self,
2896        id: &LanguageModelToolUseId,
2897        tool_name: &str,
2898        title: SharedString,
2899        kind: acp::ToolKind,
2900        input: serde_json::Value,
2901    ) {
2902        self.0
2903            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
2904                id,
2905                tool_name,
2906                title.to_string(),
2907                kind,
2908                input,
2909            ))))
2910            .ok();
2911    }
2912
2913    fn initial_tool_call(
2914        id: &LanguageModelToolUseId,
2915        tool_name: &str,
2916        title: String,
2917        kind: acp::ToolKind,
2918        input: serde_json::Value,
2919    ) -> acp::ToolCall {
2920        acp::ToolCall::new(id.to_string(), title)
2921            .kind(kind)
2922            .raw_input(input)
2923            .meta(acp_thread::meta_with_tool_name(tool_name))
2924    }
2925
2926    fn update_tool_call_fields(
2927        &self,
2928        tool_use_id: &LanguageModelToolUseId,
2929        fields: acp::ToolCallUpdateFields,
2930    ) {
2931        self.0
2932            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2933                acp::ToolCallUpdate::new(tool_use_id.to_string(), fields).into(),
2934            )))
2935            .ok();
2936    }
2937
2938    fn send_retry(&self, status: acp_thread::RetryStatus) {
2939        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
2940    }
2941
2942    fn send_stop(&self, reason: acp::StopReason) {
2943        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
2944    }
2945
2946    fn send_canceled(&self) {
2947        self.0
2948            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2949            .ok();
2950    }
2951
2952    fn send_error(&self, error: impl Into<anyhow::Error>) {
2953        self.0.unbounded_send(Err(error.into())).ok();
2954    }
2955}
2956
2957#[derive(Clone)]
2958pub struct ToolCallEventStream {
2959    tool_use_id: LanguageModelToolUseId,
2960    stream: ThreadEventStream,
2961    fs: Option<Arc<dyn Fs>>,
2962    cancellation_rx: watch::Receiver<bool>,
2963}
2964
2965impl ToolCallEventStream {
2966    #[cfg(any(test, feature = "test-support"))]
2967    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
2968        let (stream, receiver, _cancellation_tx) = Self::test_with_cancellation();
2969        (stream, receiver)
2970    }
2971
2972    #[cfg(any(test, feature = "test-support"))]
2973    pub fn test_with_cancellation() -> (Self, ToolCallEventStreamReceiver, watch::Sender<bool>) {
2974        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
2975        let (cancellation_tx, cancellation_rx) = watch::channel(false);
2976
2977        let stream = ToolCallEventStream::new(
2978            "test_id".into(),
2979            ThreadEventStream(events_tx),
2980            None,
2981            cancellation_rx,
2982        );
2983
2984        (
2985            stream,
2986            ToolCallEventStreamReceiver(events_rx),
2987            cancellation_tx,
2988        )
2989    }
2990
2991    /// Signal cancellation for this event stream. Only available in tests.
2992    #[cfg(any(test, feature = "test-support"))]
2993    pub fn signal_cancellation_with_sender(cancellation_tx: &mut watch::Sender<bool>) {
2994        cancellation_tx.send(true).ok();
2995    }
2996
2997    fn new(
2998        tool_use_id: LanguageModelToolUseId,
2999        stream: ThreadEventStream,
3000        fs: Option<Arc<dyn Fs>>,
3001        cancellation_rx: watch::Receiver<bool>,
3002    ) -> Self {
3003        Self {
3004            tool_use_id,
3005            stream,
3006            fs,
3007            cancellation_rx,
3008        }
3009    }
3010
3011    /// Returns a future that resolves when the user cancels the tool call.
3012    /// Tools should select on this alongside their main work to detect user cancellation.
3013    pub fn cancelled_by_user(&self) -> impl std::future::Future<Output = ()> + '_ {
3014        let mut rx = self.cancellation_rx.clone();
3015        async move {
3016            loop {
3017                if *rx.borrow() {
3018                    return;
3019                }
3020                if rx.changed().await.is_err() {
3021                    // Sender dropped, will never be cancelled
3022                    std::future::pending::<()>().await;
3023                }
3024            }
3025        }
3026    }
3027
3028    /// Returns true if the user has cancelled this tool call.
3029    /// This is useful for checking cancellation state after an operation completes,
3030    /// to determine if the completion was due to user cancellation.
3031    pub fn was_cancelled_by_user(&self) -> bool {
3032        *self.cancellation_rx.clone().borrow()
3033    }
3034
3035    pub fn tool_use_id(&self) -> &LanguageModelToolUseId {
3036        &self.tool_use_id
3037    }
3038
3039    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
3040        self.stream
3041            .update_tool_call_fields(&self.tool_use_id, fields);
3042    }
3043
3044    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
3045        self.stream
3046            .0
3047            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
3048                acp_thread::ToolCallUpdateDiff {
3049                    id: acp::ToolCallId::new(self.tool_use_id.to_string()),
3050                    diff,
3051                }
3052                .into(),
3053            )))
3054            .ok();
3055    }
3056
3057    pub fn update_subagent_thread(&self, thread: Entity<acp_thread::AcpThread>) {
3058        self.stream
3059            .0
3060            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
3061                acp_thread::ToolCallUpdateSubagentThread {
3062                    id: acp::ToolCallId::new(self.tool_use_id.to_string()),
3063                    thread,
3064                }
3065                .into(),
3066            )))
3067            .ok();
3068    }
3069
3070    /// Authorize a third-party tool (e.g., MCP tool from a context server).
3071    ///
3072    /// Unlike built-in tools, third-party tools don't support pattern-based permissions.
3073    /// They only support `default_mode` (allow/deny/confirm) per tool.
3074    ///
3075    /// Uses the dropdown authorization flow with two granularities:
3076    /// - "Always for <display_name> MCP tool" → sets `tools.<tool_id>.default_mode = "allow"` or "deny"
3077    /// - "Only this time" → allow/deny once
3078    pub fn authorize_third_party_tool(
3079        &self,
3080        title: impl Into<String>,
3081        tool_id: String,
3082        display_name: String,
3083        cx: &mut App,
3084    ) -> Task<Result<()>> {
3085        let settings = agent_settings::AgentSettings::get_global(cx);
3086
3087        let decision = decide_permission_from_settings(&tool_id, "", &settings);
3088
3089        match decision {
3090            ToolPermissionDecision::Allow => return Task::ready(Ok(())),
3091            ToolPermissionDecision::Deny(reason) => return Task::ready(Err(anyhow!(reason))),
3092            ToolPermissionDecision::Confirm => {}
3093        }
3094
3095        let (response_tx, response_rx) = oneshot::channel();
3096        self.stream
3097            .0
3098            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
3099                ToolCallAuthorization {
3100                    tool_call: acp::ToolCallUpdate::new(
3101                        self.tool_use_id.to_string(),
3102                        acp::ToolCallUpdateFields::new().title(title.into()),
3103                    ),
3104                    options: acp_thread::PermissionOptions::Dropdown(vec![
3105                        acp_thread::PermissionOptionChoice {
3106                            allow: acp::PermissionOption::new(
3107                                acp::PermissionOptionId::new(format!(
3108                                    "always_allow_mcp:{}",
3109                                    tool_id
3110                                )),
3111                                format!("Always for {} MCP tool", display_name),
3112                                acp::PermissionOptionKind::AllowAlways,
3113                            ),
3114                            deny: acp::PermissionOption::new(
3115                                acp::PermissionOptionId::new(format!(
3116                                    "always_deny_mcp:{}",
3117                                    tool_id
3118                                )),
3119                                format!("Always for {} MCP tool", display_name),
3120                                acp::PermissionOptionKind::RejectAlways,
3121                            ),
3122                        },
3123                        acp_thread::PermissionOptionChoice {
3124                            allow: acp::PermissionOption::new(
3125                                acp::PermissionOptionId::new("allow"),
3126                                "Only this time",
3127                                acp::PermissionOptionKind::AllowOnce,
3128                            ),
3129                            deny: acp::PermissionOption::new(
3130                                acp::PermissionOptionId::new("deny"),
3131                                "Only this time",
3132                                acp::PermissionOptionKind::RejectOnce,
3133                            ),
3134                        },
3135                    ]),
3136                    response: response_tx,
3137                    context: None,
3138                },
3139            )))
3140            .ok();
3141
3142        let fs = self.fs.clone();
3143        cx.spawn(async move |cx| {
3144            let response_str = response_rx.await?.0.to_string();
3145
3146            if response_str == format!("always_allow_mcp:{}", tool_id) {
3147                if let Some(fs) = fs.clone() {
3148                    cx.update(|cx| {
3149                        update_settings_file(fs, cx, move |settings, _| {
3150                            settings
3151                                .agent
3152                                .get_or_insert_default()
3153                                .set_tool_default_mode(&tool_id, ToolPermissionMode::Allow);
3154                        });
3155                    });
3156                }
3157                return Ok(());
3158            }
3159            if response_str == format!("always_deny_mcp:{}", tool_id) {
3160                if let Some(fs) = fs.clone() {
3161                    cx.update(|cx| {
3162                        update_settings_file(fs, cx, move |settings, _| {
3163                            settings
3164                                .agent
3165                                .get_or_insert_default()
3166                                .set_tool_default_mode(&tool_id, ToolPermissionMode::Deny);
3167                        });
3168                    });
3169                }
3170                return Err(anyhow!("Permission to run tool denied by user"));
3171            }
3172
3173            if response_str == "allow" {
3174                return Ok(());
3175            }
3176
3177            Err(anyhow!("Permission to run tool denied by user"))
3178        })
3179    }
3180
3181    pub fn authorize(
3182        &self,
3183        title: impl Into<String>,
3184        context: ToolPermissionContext,
3185        cx: &mut App,
3186    ) -> Task<Result<()>> {
3187        use settings::ToolPermissionMode;
3188
3189        let options = context.build_permission_options();
3190
3191        let (response_tx, response_rx) = oneshot::channel();
3192        self.stream
3193            .0
3194            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
3195                ToolCallAuthorization {
3196                    tool_call: acp::ToolCallUpdate::new(
3197                        self.tool_use_id.to_string(),
3198                        acp::ToolCallUpdateFields::new().title(title.into()),
3199                    ),
3200                    options,
3201                    response: response_tx,
3202                    context: Some(context),
3203                },
3204            )))
3205            .ok();
3206
3207        let fs = self.fs.clone();
3208        cx.spawn(async move |cx| {
3209            let response_str = response_rx.await?.0.to_string();
3210
3211            // Handle "always allow tool" - e.g., "always_allow:terminal"
3212            if let Some(tool) = response_str.strip_prefix("always_allow:") {
3213                if let Some(fs) = fs.clone() {
3214                    let tool = tool.to_string();
3215                    cx.update(|cx| {
3216                        update_settings_file(fs, cx, move |settings, _| {
3217                            settings
3218                                .agent
3219                                .get_or_insert_default()
3220                                .set_tool_default_mode(&tool, ToolPermissionMode::Allow);
3221                        });
3222                    });
3223                }
3224                return Ok(());
3225            }
3226
3227            // Handle "always deny tool" - e.g., "always_deny:terminal"
3228            if let Some(tool) = response_str.strip_prefix("always_deny:") {
3229                if let Some(fs) = fs.clone() {
3230                    let tool = tool.to_string();
3231                    cx.update(|cx| {
3232                        update_settings_file(fs, cx, move |settings, _| {
3233                            settings
3234                                .agent
3235                                .get_or_insert_default()
3236                                .set_tool_default_mode(&tool, ToolPermissionMode::Deny);
3237                        });
3238                    });
3239                }
3240                return Err(anyhow!("Permission to run tool denied by user"));
3241            }
3242
3243            // Handle "always allow pattern" - e.g., "always_allow_pattern:terminal:^cargo\s"
3244            if response_str.starts_with("always_allow_pattern:") {
3245                let parts: Vec<&str> = response_str.splitn(3, ':').collect();
3246                if parts.len() == 3 {
3247                    let pattern_tool_name = parts[1].to_string();
3248                    let pattern = parts[2].to_string();
3249                    if let Some(fs) = fs.clone() {
3250                        cx.update(|cx| {
3251                            update_settings_file(fs, cx, move |settings, _| {
3252                                settings
3253                                    .agent
3254                                    .get_or_insert_default()
3255                                    .add_tool_allow_pattern(&pattern_tool_name, pattern);
3256                            });
3257                        });
3258                    }
3259                }
3260                return Ok(());
3261            }
3262
3263            // Handle "always deny pattern" - e.g., "always_deny_pattern:terminal:^cargo\s"
3264            if response_str.starts_with("always_deny_pattern:") {
3265                let parts: Vec<&str> = response_str.splitn(3, ':').collect();
3266                if parts.len() == 3 {
3267                    let pattern_tool_name = parts[1].to_string();
3268                    let pattern = parts[2].to_string();
3269                    if let Some(fs) = fs.clone() {
3270                        cx.update(|cx| {
3271                            update_settings_file(fs, cx, move |settings, _| {
3272                                settings
3273                                    .agent
3274                                    .get_or_insert_default()
3275                                    .add_tool_deny_pattern(&pattern_tool_name, pattern);
3276                            });
3277                        });
3278                    }
3279                }
3280                return Err(anyhow!("Permission to run tool denied by user"));
3281            }
3282
3283            // Handle simple "allow" (allow once)
3284            if response_str == "allow" {
3285                return Ok(());
3286            }
3287
3288            // Handle simple "deny" (deny once)
3289            Err(anyhow!("Permission to run tool denied by user"))
3290        })
3291    }
3292}
3293
3294#[cfg(any(test, feature = "test-support"))]
3295pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
3296
3297#[cfg(any(test, feature = "test-support"))]
3298impl ToolCallEventStreamReceiver {
3299    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
3300        let event = self.0.next().await;
3301        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
3302            auth
3303        } else {
3304            panic!("Expected ToolCallAuthorization but got: {:?}", event);
3305        }
3306    }
3307
3308    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
3309        let event = self.0.next().await;
3310        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
3311            update,
3312        )))) = event
3313        {
3314            update.fields
3315        } else {
3316            panic!("Expected update fields but got: {:?}", event);
3317        }
3318    }
3319
3320    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
3321        let event = self.0.next().await;
3322        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
3323            update,
3324        )))) = event
3325        {
3326            update.diff
3327        } else {
3328            panic!("Expected diff but got: {:?}", event);
3329        }
3330    }
3331
3332    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
3333        let event = self.0.next().await;
3334        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
3335            update,
3336        )))) = event
3337        {
3338            update.terminal
3339        } else {
3340            panic!("Expected terminal but got: {:?}", event);
3341        }
3342    }
3343}
3344
3345#[cfg(any(test, feature = "test-support"))]
3346impl std::ops::Deref for ToolCallEventStreamReceiver {
3347    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
3348
3349    fn deref(&self) -> &Self::Target {
3350        &self.0
3351    }
3352}
3353
3354#[cfg(any(test, feature = "test-support"))]
3355impl std::ops::DerefMut for ToolCallEventStreamReceiver {
3356    fn deref_mut(&mut self) -> &mut Self::Target {
3357        &mut self.0
3358    }
3359}
3360
3361impl From<&str> for UserMessageContent {
3362    fn from(text: &str) -> Self {
3363        Self::Text(text.into())
3364    }
3365}
3366
3367impl UserMessageContent {
3368    pub fn from_content_block(value: acp::ContentBlock, path_style: PathStyle) -> Self {
3369        match value {
3370            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
3371            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
3372            acp::ContentBlock::Audio(_) => {
3373                // TODO
3374                Self::Text("[audio]".to_string())
3375            }
3376            acp::ContentBlock::ResourceLink(resource_link) => {
3377                match MentionUri::parse(&resource_link.uri, path_style) {
3378                    Ok(uri) => Self::Mention {
3379                        uri,
3380                        content: String::new(),
3381                    },
3382                    Err(err) => {
3383                        log::error!("Failed to parse mention link: {}", err);
3384                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
3385                    }
3386                }
3387            }
3388            acp::ContentBlock::Resource(resource) => match resource.resource {
3389                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
3390                    match MentionUri::parse(&resource.uri, path_style) {
3391                        Ok(uri) => Self::Mention {
3392                            uri,
3393                            content: resource.text,
3394                        },
3395                        Err(err) => {
3396                            log::error!("Failed to parse mention link: {}", err);
3397                            Self::Text(
3398                                MarkdownCodeBlock {
3399                                    tag: &resource.uri,
3400                                    text: &resource.text,
3401                                }
3402                                .to_string(),
3403                            )
3404                        }
3405                    }
3406                }
3407                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
3408                    // TODO
3409                    Self::Text("[blob]".to_string())
3410                }
3411                other => {
3412                    log::warn!("Unexpected content type: {:?}", other);
3413                    Self::Text("[unknown]".to_string())
3414                }
3415            },
3416            other => {
3417                log::warn!("Unexpected content type: {:?}", other);
3418                Self::Text("[unknown]".to_string())
3419            }
3420        }
3421    }
3422}
3423
3424impl From<UserMessageContent> for acp::ContentBlock {
3425    fn from(content: UserMessageContent) -> Self {
3426        match content {
3427            UserMessageContent::Text(text) => text.into(),
3428            UserMessageContent::Image(image) => {
3429                acp::ContentBlock::Image(acp::ImageContent::new(image.source, "image/png"))
3430            }
3431            UserMessageContent::Mention { uri, content } => acp::ContentBlock::Resource(
3432                acp::EmbeddedResource::new(acp::EmbeddedResourceResource::TextResourceContents(
3433                    acp::TextResourceContents::new(content, uri.to_uri().to_string()),
3434                )),
3435            ),
3436        }
3437    }
3438}
3439
3440fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
3441    LanguageModelImage {
3442        source: image_content.data.into(),
3443        size: None,
3444    }
3445}