thread.rs

   1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ProjectSnapshot, ReadFileTool,
   5    RestoreFileFromDiskTool, SaveFileTool, SkillsContext, SpawnAgentTool, StreamingEditFileTool,
   6    SystemPromptTemplate, Template, Templates, TerminalTool, ToolPermissionDecision, WebSearchTool,
   7    decide_permission_from_settings,
   8};
   9use acp_thread::{MentionUri, UserMessageId};
  10use action_log::ActionLog;
  11use feature_flags::{
  12    FeatureFlagAppExt as _, StreamingEditFileToolFeatureFlag, UpdatePlanToolFeatureFlag,
  13};
  14
  15use agent_client_protocol as acp;
  16use agent_settings::{
  17    AgentProfileId, AgentSettings, SUMMARIZE_THREAD_DETAILED_PROMPT, SUMMARIZE_THREAD_PROMPT,
  18};
  19use anyhow::{Context as _, Result, anyhow};
  20use chrono::{DateTime, Utc};
  21use client::UserStore;
  22use cloud_api_types::Plan;
  23use collections::{HashMap, HashSet, IndexMap};
  24use fs::Fs;
  25use futures::stream;
  26use futures::{
  27    FutureExt,
  28    channel::{mpsc, oneshot},
  29    future::Shared,
  30    stream::FuturesUnordered,
  31};
  32use gpui::{
  33    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  34};
  35use heck::ToSnakeCase as _;
  36use language_model::{
  37    CompletionIntent, LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent,
  38    LanguageModelId, LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry,
  39    LanguageModelRequest, LanguageModelRequestMessage, LanguageModelRequestTool,
  40    LanguageModelToolResult, LanguageModelToolResultContent, LanguageModelToolSchemaFormat,
  41    LanguageModelToolUse, LanguageModelToolUseId, Role, SelectedModel, Speed, StopReason,
  42    TokenUsage, ZED_CLOUD_PROVIDER_ID,
  43};
  44use project::Project;
  45use prompt_store::ProjectContext;
  46use schemars::{JsonSchema, Schema};
  47use serde::de::DeserializeOwned;
  48use serde::{Deserialize, Serialize};
  49use settings::{LanguageModelSelection, Settings, ToolPermissionMode, update_settings_file};
  50use smol::stream::StreamExt;
  51use std::{
  52    collections::BTreeMap,
  53    marker::PhantomData,
  54    ops::RangeInclusive,
  55    path::Path,
  56    rc::Rc,
  57    sync::Arc,
  58    time::{Duration, Instant},
  59};
  60use std::{fmt::Write, path::PathBuf};
  61use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock, paths::PathStyle};
  62use uuid::Uuid;
  63
  64const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  65pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  66pub const MAX_SUBAGENT_DEPTH: u8 = 1;
  67
  68/// Context passed to a subagent thread for lifecycle management
  69#[derive(Clone, Debug, Serialize, Deserialize)]
  70pub struct SubagentContext {
  71    /// ID of the parent thread
  72    pub parent_thread_id: acp::SessionId,
  73
  74    /// Current depth level (0 = root agent, 1 = first-level subagent, etc.)
  75    pub depth: u8,
  76}
  77
  78/// The ID of the user prompt that initiated a request.
  79///
  80/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  81#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  82pub struct PromptId(Arc<str>);
  83
  84impl PromptId {
  85    pub fn new() -> Self {
  86        Self(Uuid::new_v4().to_string().into())
  87    }
  88}
  89
  90impl std::fmt::Display for PromptId {
  91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  92        write!(f, "{}", self.0)
  93    }
  94}
  95
  96pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
  97pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
  98
  99#[derive(Debug, Clone)]
 100enum RetryStrategy {
 101    ExponentialBackoff {
 102        initial_delay: Duration,
 103        max_attempts: u8,
 104    },
 105    Fixed {
 106        delay: Duration,
 107        max_attempts: u8,
 108    },
 109}
 110
 111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 112pub enum Message {
 113    User(UserMessage),
 114    Agent(AgentMessage),
 115    Resume,
 116}
 117
 118impl Message {
 119    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 120        match self {
 121            Message::Agent(agent_message) => Some(agent_message),
 122            _ => None,
 123        }
 124    }
 125
 126    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 127        match self {
 128            Message::User(message) => {
 129                if message.content.is_empty() {
 130                    vec![]
 131                } else {
 132                    vec![message.to_request()]
 133                }
 134            }
 135            Message::Agent(message) => message.to_request(),
 136            Message::Resume => vec![LanguageModelRequestMessage {
 137                role: Role::User,
 138                content: vec!["Continue where you left off".into()],
 139                cache: false,
 140                reasoning_details: None,
 141            }],
 142        }
 143    }
 144
 145    pub fn to_markdown(&self) -> String {
 146        match self {
 147            Message::User(message) => message.to_markdown(),
 148            Message::Agent(message) => message.to_markdown(),
 149            Message::Resume => "[resume]\n".into(),
 150        }
 151    }
 152
 153    pub fn role(&self) -> Role {
 154        match self {
 155            Message::User(_) | Message::Resume => Role::User,
 156            Message::Agent(_) => Role::Assistant,
 157        }
 158    }
 159}
 160
 161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 162pub struct UserMessage {
 163    pub id: UserMessageId,
 164    pub content: Vec<UserMessageContent>,
 165}
 166
 167#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 168pub enum UserMessageContent {
 169    Text(String),
 170    Mention { uri: MentionUri, content: String },
 171    Image(LanguageModelImage),
 172}
 173
 174impl UserMessage {
 175    pub fn to_markdown(&self) -> String {
 176        let mut markdown = String::new();
 177
 178        for content in &self.content {
 179            match content {
 180                UserMessageContent::Text(text) => {
 181                    markdown.push_str(text);
 182                    markdown.push('\n');
 183                }
 184                UserMessageContent::Image(_) => {
 185                    markdown.push_str("<image />\n");
 186                }
 187                UserMessageContent::Mention { uri, content } => {
 188                    if !content.is_empty() {
 189                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 190                    } else {
 191                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 192                    }
 193                }
 194            }
 195        }
 196
 197        markdown
 198    }
 199
 200    fn to_request(&self) -> LanguageModelRequestMessage {
 201        let mut message = LanguageModelRequestMessage {
 202            role: Role::User,
 203            content: Vec::with_capacity(self.content.len()),
 204            cache: false,
 205            reasoning_details: None,
 206        };
 207
 208        const OPEN_CONTEXT: &str = "<context>\n\
 209            The following items were attached by the user. \
 210            They are up-to-date and don't need to be re-read.\n\n";
 211
 212        const OPEN_FILES_TAG: &str = "<files>";
 213        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 214        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 215        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 216        const OPEN_THREADS_TAG: &str = "<threads>";
 217        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 218        const OPEN_RULES_TAG: &str =
 219            "<rules>\nThe user has specified the following rules that should be applied:\n";
 220        const OPEN_DIAGNOSTICS_TAG: &str = "<diagnostics>";
 221        const OPEN_DIFFS_TAG: &str = "<diffs>";
 222        const MERGE_CONFLICT_TAG: &str = "<merge_conflicts>";
 223
 224        let mut file_context = OPEN_FILES_TAG.to_string();
 225        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 226        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 227        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 228        let mut thread_context = OPEN_THREADS_TAG.to_string();
 229        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 230        let mut rules_context = OPEN_RULES_TAG.to_string();
 231        let mut diagnostics_context = OPEN_DIAGNOSTICS_TAG.to_string();
 232        let mut diffs_context = OPEN_DIFFS_TAG.to_string();
 233        let mut merge_conflict_context = MERGE_CONFLICT_TAG.to_string();
 234
 235        for chunk in &self.content {
 236            let chunk = match chunk {
 237                UserMessageContent::Text(text) => {
 238                    language_model::MessageContent::Text(text.clone())
 239                }
 240                UserMessageContent::Image(value) => {
 241                    language_model::MessageContent::Image(value.clone())
 242                }
 243                UserMessageContent::Mention { uri, content } => {
 244                    match uri {
 245                        MentionUri::File { abs_path } => {
 246                            write!(
 247                                &mut file_context,
 248                                "\n{}",
 249                                MarkdownCodeBlock {
 250                                    tag: &codeblock_tag(abs_path, None),
 251                                    text: &content.to_string(),
 252                                }
 253                            )
 254                            .ok();
 255                        }
 256                        MentionUri::PastedImage => {
 257                            debug_panic!("pasted image URI should not be used in mention content")
 258                        }
 259                        MentionUri::Directory { .. } => {
 260                            write!(&mut directory_context, "\n{}\n", content).ok();
 261                        }
 262                        MentionUri::Symbol {
 263                            abs_path: path,
 264                            line_range,
 265                            ..
 266                        } => {
 267                            write!(
 268                                &mut symbol_context,
 269                                "\n{}",
 270                                MarkdownCodeBlock {
 271                                    tag: &codeblock_tag(path, Some(line_range)),
 272                                    text: content
 273                                }
 274                            )
 275                            .ok();
 276                        }
 277                        MentionUri::Selection {
 278                            abs_path: path,
 279                            line_range,
 280                            ..
 281                        } => {
 282                            write!(
 283                                &mut selection_context,
 284                                "\n{}",
 285                                MarkdownCodeBlock {
 286                                    tag: &codeblock_tag(
 287                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 288                                        Some(line_range)
 289                                    ),
 290                                    text: content
 291                                }
 292                            )
 293                            .ok();
 294                        }
 295                        MentionUri::Thread { .. } => {
 296                            write!(&mut thread_context, "\n{}\n", content).ok();
 297                        }
 298                        MentionUri::Rule { .. } => {
 299                            write!(
 300                                &mut rules_context,
 301                                "\n{}",
 302                                MarkdownCodeBlock {
 303                                    tag: "",
 304                                    text: content
 305                                }
 306                            )
 307                            .ok();
 308                        }
 309                        MentionUri::Fetch { url } => {
 310                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 311                        }
 312                        MentionUri::Diagnostics { .. } => {
 313                            write!(&mut diagnostics_context, "\n{}\n", content).ok();
 314                        }
 315                        MentionUri::TerminalSelection { .. } => {
 316                            write!(
 317                                &mut selection_context,
 318                                "\n{}",
 319                                MarkdownCodeBlock {
 320                                    tag: "console",
 321                                    text: content
 322                                }
 323                            )
 324                            .ok();
 325                        }
 326                        MentionUri::GitDiff { base_ref } => {
 327                            write!(
 328                                &mut diffs_context,
 329                                "\nBranch diff against {}:\n{}",
 330                                base_ref,
 331                                MarkdownCodeBlock {
 332                                    tag: "diff",
 333                                    text: content
 334                                }
 335                            )
 336                            .ok();
 337                        }
 338                        MentionUri::MergeConflict { file_path } => {
 339                            write!(
 340                                &mut merge_conflict_context,
 341                                "\nMerge conflict in {}:\n{}",
 342                                file_path,
 343                                MarkdownCodeBlock {
 344                                    tag: "diff",
 345                                    text: content
 346                                }
 347                            )
 348                            .ok();
 349                        }
 350                    }
 351
 352                    language_model::MessageContent::Text(uri.as_link().to_string())
 353                }
 354            };
 355
 356            message.content.push(chunk);
 357        }
 358
 359        let len_before_context = message.content.len();
 360
 361        if file_context.len() > OPEN_FILES_TAG.len() {
 362            file_context.push_str("</files>\n");
 363            message
 364                .content
 365                .push(language_model::MessageContent::Text(file_context));
 366        }
 367
 368        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 369            directory_context.push_str("</directories>\n");
 370            message
 371                .content
 372                .push(language_model::MessageContent::Text(directory_context));
 373        }
 374
 375        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 376            symbol_context.push_str("</symbols>\n");
 377            message
 378                .content
 379                .push(language_model::MessageContent::Text(symbol_context));
 380        }
 381
 382        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 383            selection_context.push_str("</selections>\n");
 384            message
 385                .content
 386                .push(language_model::MessageContent::Text(selection_context));
 387        }
 388
 389        if diffs_context.len() > OPEN_DIFFS_TAG.len() {
 390            diffs_context.push_str("</diffs>\n");
 391            message
 392                .content
 393                .push(language_model::MessageContent::Text(diffs_context));
 394        }
 395
 396        if thread_context.len() > OPEN_THREADS_TAG.len() {
 397            thread_context.push_str("</threads>\n");
 398            message
 399                .content
 400                .push(language_model::MessageContent::Text(thread_context));
 401        }
 402
 403        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 404            fetch_context.push_str("</fetched_urls>\n");
 405            message
 406                .content
 407                .push(language_model::MessageContent::Text(fetch_context));
 408        }
 409
 410        if rules_context.len() > OPEN_RULES_TAG.len() {
 411            rules_context.push_str("</user_rules>\n");
 412            message
 413                .content
 414                .push(language_model::MessageContent::Text(rules_context));
 415        }
 416
 417        if diagnostics_context.len() > OPEN_DIAGNOSTICS_TAG.len() {
 418            diagnostics_context.push_str("</diagnostics>\n");
 419            message
 420                .content
 421                .push(language_model::MessageContent::Text(diagnostics_context));
 422        }
 423
 424        if merge_conflict_context.len() > MERGE_CONFLICT_TAG.len() {
 425            merge_conflict_context.push_str("</merge_conflicts>\n");
 426            message
 427                .content
 428                .push(language_model::MessageContent::Text(merge_conflict_context));
 429        }
 430
 431        if message.content.len() > len_before_context {
 432            message.content.insert(
 433                len_before_context,
 434                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 435            );
 436            message
 437                .content
 438                .push(language_model::MessageContent::Text("</context>".into()));
 439        }
 440
 441        message
 442    }
 443}
 444
 445fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 446    let mut result = String::new();
 447
 448    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 449        let _ = write!(result, "{} ", extension);
 450    }
 451
 452    let _ = write!(result, "{}", full_path.display());
 453
 454    if let Some(range) = line_range {
 455        if range.start() == range.end() {
 456            let _ = write!(result, ":{}", range.start() + 1);
 457        } else {
 458            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 459        }
 460    }
 461
 462    result
 463}
 464
 465impl AgentMessage {
 466    pub fn to_markdown(&self) -> String {
 467        let mut markdown = String::new();
 468
 469        for content in &self.content {
 470            match content {
 471                AgentMessageContent::Text(text) => {
 472                    markdown.push_str(text);
 473                    markdown.push('\n');
 474                }
 475                AgentMessageContent::Thinking { text, .. } => {
 476                    markdown.push_str("<think>");
 477                    markdown.push_str(text);
 478                    markdown.push_str("</think>\n");
 479                }
 480                AgentMessageContent::RedactedThinking(_) => {
 481                    markdown.push_str("<redacted_thinking />\n")
 482                }
 483                AgentMessageContent::ToolUse(tool_use) => {
 484                    markdown.push_str(&format!(
 485                        "**Tool Use**: {} (ID: {})\n",
 486                        tool_use.name, tool_use.id
 487                    ));
 488                    markdown.push_str(&format!(
 489                        "{}\n",
 490                        MarkdownCodeBlock {
 491                            tag: "json",
 492                            text: &format!("{:#}", tool_use.input)
 493                        }
 494                    ));
 495                }
 496            }
 497        }
 498
 499        for tool_result in self.tool_results.values() {
 500            markdown.push_str(&format!(
 501                "**Tool Result**: {} (ID: {})\n\n",
 502                tool_result.tool_name, tool_result.tool_use_id
 503            ));
 504            if tool_result.is_error {
 505                markdown.push_str("**ERROR:**\n");
 506            }
 507
 508            match &tool_result.content {
 509                LanguageModelToolResultContent::Text(text) => {
 510                    writeln!(markdown, "{text}\n").ok();
 511                }
 512                LanguageModelToolResultContent::Image(_) => {
 513                    writeln!(markdown, "<image />\n").ok();
 514                }
 515            }
 516
 517            if let Some(output) = tool_result.output.as_ref() {
 518                writeln!(
 519                    markdown,
 520                    "**Debug Output**:\n\n```json\n{}\n```\n",
 521                    serde_json::to_string_pretty(output).unwrap()
 522                )
 523                .unwrap();
 524            }
 525        }
 526
 527        markdown
 528    }
 529
 530    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 531        let mut assistant_message = LanguageModelRequestMessage {
 532            role: Role::Assistant,
 533            content: Vec::with_capacity(self.content.len()),
 534            cache: false,
 535            reasoning_details: self.reasoning_details.clone(),
 536        };
 537        for chunk in &self.content {
 538            match chunk {
 539                AgentMessageContent::Text(text) => {
 540                    assistant_message
 541                        .content
 542                        .push(language_model::MessageContent::Text(text.clone()));
 543                }
 544                AgentMessageContent::Thinking { text, signature } => {
 545                    assistant_message
 546                        .content
 547                        .push(language_model::MessageContent::Thinking {
 548                            text: text.clone(),
 549                            signature: signature.clone(),
 550                        });
 551                }
 552                AgentMessageContent::RedactedThinking(value) => {
 553                    assistant_message.content.push(
 554                        language_model::MessageContent::RedactedThinking(value.clone()),
 555                    );
 556                }
 557                AgentMessageContent::ToolUse(tool_use) => {
 558                    if self.tool_results.contains_key(&tool_use.id) {
 559                        assistant_message
 560                            .content
 561                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 562                    }
 563                }
 564            };
 565        }
 566
 567        let mut user_message = LanguageModelRequestMessage {
 568            role: Role::User,
 569            content: Vec::new(),
 570            cache: false,
 571            reasoning_details: None,
 572        };
 573
 574        for tool_result in self.tool_results.values() {
 575            let mut tool_result = tool_result.clone();
 576            // Surprisingly, the API fails if we return an empty string here.
 577            // It thinks we are sending a tool use without a tool result.
 578            if tool_result.content.is_empty() {
 579                tool_result.content = "<Tool returned an empty string>".into();
 580            }
 581            user_message
 582                .content
 583                .push(language_model::MessageContent::ToolResult(tool_result));
 584        }
 585
 586        let mut messages = Vec::new();
 587        if !assistant_message.content.is_empty() {
 588            messages.push(assistant_message);
 589        }
 590        if !user_message.content.is_empty() {
 591            messages.push(user_message);
 592        }
 593        messages
 594    }
 595}
 596
 597#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 598pub struct AgentMessage {
 599    pub content: Vec<AgentMessageContent>,
 600    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 601    pub reasoning_details: Option<serde_json::Value>,
 602}
 603
 604#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 605pub enum AgentMessageContent {
 606    Text(String),
 607    Thinking {
 608        text: String,
 609        signature: Option<String>,
 610    },
 611    RedactedThinking(String),
 612    ToolUse(LanguageModelToolUse),
 613}
 614
 615pub trait TerminalHandle {
 616    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 617    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 618    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 619    fn kill(&self, cx: &AsyncApp) -> Result<()>;
 620    fn was_stopped_by_user(&self, cx: &AsyncApp) -> Result<bool>;
 621}
 622
 623pub trait SubagentHandle {
 624    /// The session ID of this subagent thread
 625    fn id(&self) -> acp::SessionId;
 626    /// The current number of entries in the thread.
 627    /// Useful for knowing where the next turn will begin
 628    fn num_entries(&self, cx: &App) -> usize;
 629    /// Runs a turn for a given message and returns both the response and the index of that output message.
 630    fn send(&self, message: String, cx: &AsyncApp) -> Task<Result<String>>;
 631}
 632
 633pub trait ThreadEnvironment {
 634    fn create_terminal(
 635        &self,
 636        command: String,
 637        cwd: Option<PathBuf>,
 638        output_byte_limit: Option<u64>,
 639        cx: &mut AsyncApp,
 640    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 641
 642    fn create_subagent(&self, label: String, cx: &mut App) -> Result<Rc<dyn SubagentHandle>>;
 643
 644    fn resume_subagent(
 645        &self,
 646        _session_id: acp::SessionId,
 647        _cx: &mut App,
 648    ) -> Result<Rc<dyn SubagentHandle>> {
 649        Err(anyhow::anyhow!(
 650            "Resuming subagent sessions is not supported"
 651        ))
 652    }
 653}
 654
 655#[derive(Debug)]
 656pub enum ThreadEvent {
 657    UserMessage(UserMessage),
 658    AgentText(String),
 659    AgentThinking(String),
 660    ToolCall(acp::ToolCall),
 661    ToolCallUpdate(acp_thread::ToolCallUpdate),
 662    Plan(acp::Plan),
 663    ToolCallAuthorization(ToolCallAuthorization),
 664    SubagentSpawned(acp::SessionId),
 665    Retry(acp_thread::RetryStatus),
 666    Stop(acp::StopReason),
 667}
 668
 669#[derive(Debug)]
 670pub struct NewTerminal {
 671    pub command: String,
 672    pub output_byte_limit: Option<u64>,
 673    pub cwd: Option<PathBuf>,
 674    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 675}
 676
 677#[derive(Debug, Clone)]
 678pub struct ToolPermissionContext {
 679    pub tool_name: String,
 680    pub input_values: Vec<String>,
 681    pub scope: ToolPermissionScope,
 682}
 683
 684#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 685pub enum ToolPermissionScope {
 686    ToolInput,
 687    SymlinkTarget,
 688}
 689
 690impl ToolPermissionContext {
 691    pub fn new(tool_name: impl Into<String>, input_values: Vec<String>) -> Self {
 692        Self {
 693            tool_name: tool_name.into(),
 694            input_values,
 695            scope: ToolPermissionScope::ToolInput,
 696        }
 697    }
 698
 699    pub fn symlink_target(tool_name: impl Into<String>, target_paths: Vec<String>) -> Self {
 700        Self {
 701            tool_name: tool_name.into(),
 702            input_values: target_paths,
 703            scope: ToolPermissionScope::SymlinkTarget,
 704        }
 705    }
 706
 707    /// Builds the permission options for this tool context.
 708    ///
 709    /// This is the canonical source for permission option generation.
 710    /// Tests should use this function rather than manually constructing options.
 711    ///
 712    /// # Shell Compatibility for Terminal Tool
 713    ///
 714    /// For the terminal tool, "Always allow" options are only shown when the user's
 715    /// shell supports POSIX-like command chaining syntax (`&&`, `||`, `;`, `|`).
 716    ///
 717    /// **Why this matters:** When a user sets up an "always allow" pattern like `^cargo`,
 718    /// we need to parse the command to extract all sub-commands and verify that EVERY
 719    /// sub-command matches the pattern. Otherwise, an attacker could craft a command like
 720    /// `cargo build && rm -rf /` that would bypass the security check.
 721    ///
 722    /// **Supported shells:** Posix (sh, bash, dash, zsh), Fish 3.0+, PowerShell 7+/Pwsh,
 723    /// Cmd, Xonsh, Csh, Tcsh
 724    ///
 725    /// **Unsupported shells:** Nushell (uses `and`/`or` keywords), Elvish (uses `and`/`or`
 726    /// keywords), Rc (Plan 9 shell - no `&&`/`||` operators)
 727    ///
 728    /// For unsupported shells, we hide the "Always allow" UI options entirely, and if
 729    /// the user has `always_allow` rules configured in settings, `ToolPermissionDecision::from_input`
 730    /// will return a `Deny` with an explanatory error message.
 731    pub fn build_permission_options(&self) -> acp_thread::PermissionOptions {
 732        use crate::pattern_extraction::*;
 733        use util::shell::ShellKind;
 734
 735        let tool_name = &self.tool_name;
 736        let input_values = &self.input_values;
 737        if self.scope == ToolPermissionScope::SymlinkTarget {
 738            return acp_thread::PermissionOptions::Flat(vec![
 739                acp::PermissionOption::new(
 740                    acp::PermissionOptionId::new("allow"),
 741                    "Yes",
 742                    acp::PermissionOptionKind::AllowOnce,
 743                ),
 744                acp::PermissionOption::new(
 745                    acp::PermissionOptionId::new("deny"),
 746                    "No",
 747                    acp::PermissionOptionKind::RejectOnce,
 748                ),
 749            ]);
 750        }
 751
 752        // Check if the user's shell supports POSIX-like command chaining.
 753        // See the doc comment above for the full explanation of why this is needed.
 754        let shell_supports_always_allow = if tool_name == TerminalTool::NAME {
 755            ShellKind::system().supports_posix_chaining()
 756        } else {
 757            true
 758        };
 759
 760        // For terminal commands with multiple pipeline commands, use DropdownWithPatterns
 761        // to let users individually select which command patterns to always allow.
 762        if tool_name == TerminalTool::NAME && shell_supports_always_allow {
 763            if let Some(input) = input_values.first() {
 764                let all_patterns = extract_all_terminal_patterns(input);
 765                if all_patterns.len() > 1 {
 766                    let mut choices = Vec::new();
 767                    choices.push(acp_thread::PermissionOptionChoice {
 768                        allow: acp::PermissionOption::new(
 769                            acp::PermissionOptionId::new(format!("always_allow:{}", tool_name)),
 770                            format!("Always for {}", tool_name.replace('_', " ")),
 771                            acp::PermissionOptionKind::AllowAlways,
 772                        ),
 773                        deny: acp::PermissionOption::new(
 774                            acp::PermissionOptionId::new(format!("always_deny:{}", tool_name)),
 775                            format!("Always for {}", tool_name.replace('_', " ")),
 776                            acp::PermissionOptionKind::RejectAlways,
 777                        ),
 778                        sub_patterns: vec![],
 779                    });
 780                    choices.push(acp_thread::PermissionOptionChoice {
 781                        allow: acp::PermissionOption::new(
 782                            acp::PermissionOptionId::new("allow"),
 783                            "Only this time",
 784                            acp::PermissionOptionKind::AllowOnce,
 785                        ),
 786                        deny: acp::PermissionOption::new(
 787                            acp::PermissionOptionId::new("deny"),
 788                            "Only this time",
 789                            acp::PermissionOptionKind::RejectOnce,
 790                        ),
 791                        sub_patterns: vec![],
 792                    });
 793                    return acp_thread::PermissionOptions::DropdownWithPatterns {
 794                        choices,
 795                        patterns: all_patterns,
 796                        tool_name: tool_name.clone(),
 797                    };
 798                }
 799            }
 800        }
 801
 802        let extract_for_value = |value: &str| -> (Option<String>, Option<String>) {
 803            if tool_name == TerminalTool::NAME {
 804                (
 805                    extract_terminal_pattern(value),
 806                    extract_terminal_pattern_display(value),
 807                )
 808            } else if tool_name == CopyPathTool::NAME
 809                || tool_name == MovePathTool::NAME
 810                || tool_name == EditFileTool::NAME
 811                || tool_name == DeletePathTool::NAME
 812                || tool_name == CreateDirectoryTool::NAME
 813                || tool_name == SaveFileTool::NAME
 814            {
 815                (
 816                    extract_path_pattern(value),
 817                    extract_path_pattern_display(value),
 818                )
 819            } else if tool_name == FetchTool::NAME {
 820                (
 821                    extract_url_pattern(value),
 822                    extract_url_pattern_display(value),
 823                )
 824            } else {
 825                (None, None)
 826            }
 827        };
 828
 829        // Extract patterns from all input values. Only offer a pattern-specific
 830        // "always allow/deny" button when every value produces the same pattern.
 831        let (pattern, pattern_display) = match input_values.as_slice() {
 832            [single] => extract_for_value(single),
 833            _ => {
 834                let mut iter = input_values.iter().map(|v| extract_for_value(v));
 835                match iter.next() {
 836                    Some(first) => {
 837                        if iter.all(|pair| pair.0 == first.0) {
 838                            first
 839                        } else {
 840                            (None, None)
 841                        }
 842                    }
 843                    None => (None, None),
 844                }
 845            }
 846        };
 847
 848        let mut choices = Vec::new();
 849
 850        let mut push_choice =
 851            |label: String, allow_id, deny_id, allow_kind, deny_kind, sub_patterns: Vec<String>| {
 852                choices.push(acp_thread::PermissionOptionChoice {
 853                    allow: acp::PermissionOption::new(
 854                        acp::PermissionOptionId::new(allow_id),
 855                        label.clone(),
 856                        allow_kind,
 857                    ),
 858                    deny: acp::PermissionOption::new(
 859                        acp::PermissionOptionId::new(deny_id),
 860                        label,
 861                        deny_kind,
 862                    ),
 863                    sub_patterns,
 864                });
 865            };
 866
 867        if shell_supports_always_allow {
 868            push_choice(
 869                format!("Always for {}", tool_name.replace('_', " ")),
 870                format!("always_allow:{}", tool_name),
 871                format!("always_deny:{}", tool_name),
 872                acp::PermissionOptionKind::AllowAlways,
 873                acp::PermissionOptionKind::RejectAlways,
 874                vec![],
 875            );
 876
 877            if let (Some(pattern), Some(display)) = (pattern, pattern_display) {
 878                let button_text = if tool_name == TerminalTool::NAME {
 879                    format!("Always for `{}` commands", display)
 880                } else {
 881                    format!("Always for `{}`", display)
 882                };
 883                push_choice(
 884                    button_text,
 885                    format!("always_allow:{}", tool_name),
 886                    format!("always_deny:{}", tool_name),
 887                    acp::PermissionOptionKind::AllowAlways,
 888                    acp::PermissionOptionKind::RejectAlways,
 889                    vec![pattern],
 890                );
 891            }
 892        }
 893
 894        push_choice(
 895            "Only this time".to_string(),
 896            "allow".to_string(),
 897            "deny".to_string(),
 898            acp::PermissionOptionKind::AllowOnce,
 899            acp::PermissionOptionKind::RejectOnce,
 900            vec![],
 901        );
 902
 903        acp_thread::PermissionOptions::Dropdown(choices)
 904    }
 905}
 906
 907#[derive(Debug)]
 908pub struct ToolCallAuthorization {
 909    pub tool_call: acp::ToolCallUpdate,
 910    pub options: acp_thread::PermissionOptions,
 911    pub response: oneshot::Sender<acp_thread::SelectedPermissionOutcome>,
 912    pub context: Option<ToolPermissionContext>,
 913}
 914
 915#[derive(Debug, thiserror::Error)]
 916enum CompletionError {
 917    #[error("max tokens")]
 918    MaxTokens,
 919    #[error("refusal")]
 920    Refusal,
 921    #[error(transparent)]
 922    Other(#[from] anyhow::Error),
 923}
 924
 925pub struct Thread {
 926    id: acp::SessionId,
 927    prompt_id: PromptId,
 928    updated_at: DateTime<Utc>,
 929    title: Option<SharedString>,
 930    pending_title_generation: Option<Task<()>>,
 931    pending_summary_generation: Option<Shared<Task<Option<SharedString>>>>,
 932    summary: Option<SharedString>,
 933    messages: Vec<Message>,
 934    user_store: Entity<UserStore>,
 935    /// Holds the task that handles agent interaction until the end of the turn.
 936    /// Survives across multiple requests as the model performs tool calls and
 937    /// we run tools, report their results.
 938    running_turn: Option<RunningTurn>,
 939    /// Flag indicating the UI has a queued message waiting to be sent.
 940    /// Used to signal that the turn should end at the next message boundary.
 941    has_queued_message: bool,
 942    pending_message: Option<AgentMessage>,
 943    pub(crate) tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 944    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 945    #[allow(unused)]
 946    cumulative_token_usage: TokenUsage,
 947    #[allow(unused)]
 948    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 949    pub(crate) context_server_registry: Entity<ContextServerRegistry>,
 950    profile_id: AgentProfileId,
 951    project_context: Entity<ProjectContext>,
 952    pub(crate) templates: Arc<Templates>,
 953    /// Formatted available skills for the system prompt.
 954    available_skills: Entity<SkillsContext>,
 955    model: Option<Arc<dyn LanguageModel>>,
 956    summarization_model: Option<Arc<dyn LanguageModel>>,
 957    thinking_enabled: bool,
 958    thinking_effort: Option<String>,
 959    speed: Option<Speed>,
 960    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 961    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 962    pub(crate) project: Entity<Project>,
 963    pub(crate) action_log: Entity<ActionLog>,
 964    /// True if this thread was imported from a shared thread and can be synced.
 965    imported: bool,
 966    /// If this is a subagent thread, contains context about the parent
 967    subagent_context: Option<SubagentContext>,
 968    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
 969    draft_prompt: Option<Vec<acp::ContentBlock>>,
 970    ui_scroll_position: Option<gpui::ListOffset>,
 971    /// Weak references to running subagent threads for cancellation propagation
 972    running_subagents: Vec<WeakEntity<Thread>>,
 973}
 974
 975impl Thread {
 976    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 977        let image = model.map_or(true, |model| model.supports_images());
 978        acp::PromptCapabilities::new()
 979            .image(image)
 980            .embedded_context(true)
 981    }
 982
 983    pub fn new_subagent(parent_thread: &Entity<Thread>, cx: &mut Context<Self>) -> Self {
 984        let project = parent_thread.read(cx).project.clone();
 985        let project_context = parent_thread.read(cx).project_context.clone();
 986        let context_server_registry = parent_thread.read(cx).context_server_registry.clone();
 987        let templates = parent_thread.read(cx).templates.clone();
 988        let model = parent_thread.read(cx).model().cloned();
 989        let parent_action_log = parent_thread.read(cx).action_log().clone();
 990        let action_log =
 991            cx.new(|_cx| ActionLog::new(project.clone()).with_linked_action_log(parent_action_log));
 992        let mut thread = Self::new_internal(
 993            project,
 994            project_context,
 995            context_server_registry,
 996            templates,
 997            model,
 998            action_log,
 999            cx,
1000        );
1001        thread.subagent_context = Some(SubagentContext {
1002            parent_thread_id: parent_thread.read(cx).id().clone(),
1003            depth: parent_thread.read(cx).depth() + 1,
1004        });
1005        thread
1006    }
1007
1008    pub fn new(
1009        project: Entity<Project>,
1010        project_context: Entity<ProjectContext>,
1011        context_server_registry: Entity<ContextServerRegistry>,
1012        templates: Arc<Templates>,
1013        model: Option<Arc<dyn LanguageModel>>,
1014        cx: &mut Context<Self>,
1015    ) -> Self {
1016        Self::new_internal(
1017            project.clone(),
1018            project_context,
1019            context_server_registry,
1020            templates,
1021            model,
1022            cx.new(|_cx| ActionLog::new(project)),
1023            cx,
1024        )
1025    }
1026
1027    fn new_internal(
1028        project: Entity<Project>,
1029        project_context: Entity<ProjectContext>,
1030        context_server_registry: Entity<ContextServerRegistry>,
1031        templates: Arc<Templates>,
1032        model: Option<Arc<dyn LanguageModel>>,
1033        action_log: Entity<ActionLog>,
1034        cx: &mut Context<Self>,
1035    ) -> Self {
1036        let settings = AgentSettings::get_global(cx);
1037        let profile_id = settings.default_profile.clone();
1038        let enable_thinking = settings
1039            .default_model
1040            .as_ref()
1041            .is_some_and(|model| model.enable_thinking);
1042        let thinking_effort = settings
1043            .default_model
1044            .as_ref()
1045            .and_then(|model| model.effort.clone());
1046        let (prompt_capabilities_tx, prompt_capabilities_rx) =
1047            watch::channel(Self::prompt_capabilities(model.as_deref()));
1048        let worktree_roots: Vec<std::path::PathBuf> = project
1049            .read(cx)
1050            .visible_worktrees(cx)
1051            .map(|worktree| worktree.read(cx).abs_path().as_ref().to_path_buf())
1052            .collect();
1053        let available_skills = SkillsContext::new(worktree_roots, templates.clone(), cx);
1054        Self {
1055            id: acp::SessionId::new(uuid::Uuid::new_v4().to_string()),
1056            prompt_id: PromptId::new(),
1057            updated_at: Utc::now(),
1058            title: None,
1059            pending_title_generation: None,
1060            pending_summary_generation: None,
1061            summary: None,
1062            messages: Vec::new(),
1063            user_store: project.read(cx).user_store(),
1064            running_turn: None,
1065            has_queued_message: false,
1066            pending_message: None,
1067            tools: BTreeMap::default(),
1068            request_token_usage: HashMap::default(),
1069            cumulative_token_usage: TokenUsage::default(),
1070            initial_project_snapshot: {
1071                let project_snapshot = Self::project_snapshot(project.clone(), cx);
1072                cx.foreground_executor()
1073                    .spawn(async move { Some(project_snapshot.await) })
1074                    .shared()
1075            },
1076            context_server_registry,
1077            profile_id,
1078            project_context,
1079            templates,
1080            available_skills,
1081            model,
1082            summarization_model: None,
1083            thinking_enabled: enable_thinking,
1084            speed: None,
1085            thinking_effort,
1086            prompt_capabilities_tx,
1087            prompt_capabilities_rx,
1088            project,
1089            action_log,
1090            imported: false,
1091            subagent_context: None,
1092            draft_prompt: None,
1093            ui_scroll_position: None,
1094            running_subagents: Vec::new(),
1095        }
1096    }
1097
1098    pub fn id(&self) -> &acp::SessionId {
1099        &self.id
1100    }
1101
1102    /// Returns true if this thread was imported from a shared thread.
1103    pub fn is_imported(&self) -> bool {
1104        self.imported
1105    }
1106
1107    pub fn replay(
1108        &mut self,
1109        cx: &mut Context<Self>,
1110    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
1111        let (tx, rx) = mpsc::unbounded();
1112        let stream = ThreadEventStream(tx);
1113        for message in &self.messages {
1114            match message {
1115                Message::User(user_message) => stream.send_user_message(user_message),
1116                Message::Agent(assistant_message) => {
1117                    for content in &assistant_message.content {
1118                        match content {
1119                            AgentMessageContent::Text(text) => stream.send_text(text),
1120                            AgentMessageContent::Thinking { text, .. } => {
1121                                stream.send_thinking(text)
1122                            }
1123                            AgentMessageContent::RedactedThinking(_) => {}
1124                            AgentMessageContent::ToolUse(tool_use) => {
1125                                self.replay_tool_call(
1126                                    tool_use,
1127                                    assistant_message.tool_results.get(&tool_use.id),
1128                                    &stream,
1129                                    cx,
1130                                );
1131                            }
1132                        }
1133                    }
1134                }
1135                Message::Resume => {}
1136            }
1137        }
1138        rx
1139    }
1140
1141    fn replay_tool_call(
1142        &self,
1143        tool_use: &LanguageModelToolUse,
1144        tool_result: Option<&LanguageModelToolResult>,
1145        stream: &ThreadEventStream,
1146        cx: &mut Context<Self>,
1147    ) {
1148        // Extract saved output and status first, so they're available even if tool is not found
1149        let output = tool_result
1150            .as_ref()
1151            .and_then(|result| result.output.clone());
1152        let status = tool_result
1153            .as_ref()
1154            .map_or(acp::ToolCallStatus::Failed, |result| {
1155                if result.is_error {
1156                    acp::ToolCallStatus::Failed
1157                } else {
1158                    acp::ToolCallStatus::Completed
1159                }
1160            });
1161
1162        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
1163            self.context_server_registry
1164                .read(cx)
1165                .servers()
1166                .find_map(|(_, tools)| {
1167                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
1168                        Some(tool.clone())
1169                    } else {
1170                        None
1171                    }
1172                })
1173        });
1174
1175        let Some(tool) = tool else {
1176            // Tool not found (e.g., MCP server not connected after restart),
1177            // but still display the saved result if available.
1178            // We need to send both ToolCall and ToolCallUpdate events because the UI
1179            // only converts raw_output to displayable content in update_fields, not from_acp.
1180            stream
1181                .0
1182                .unbounded_send(Ok(ThreadEvent::ToolCall(
1183                    acp::ToolCall::new(tool_use.id.to_string(), tool_use.name.to_string())
1184                        .status(status)
1185                        .raw_input(tool_use.input.clone()),
1186                )))
1187                .ok();
1188            stream.update_tool_call_fields(
1189                &tool_use.id,
1190                acp::ToolCallUpdateFields::new()
1191                    .status(status)
1192                    .raw_output(output),
1193                None,
1194            );
1195            return;
1196        };
1197
1198        let title = tool.initial_title(tool_use.input.clone(), cx);
1199        let kind = tool.kind();
1200        stream.send_tool_call(
1201            &tool_use.id,
1202            &tool_use.name,
1203            title,
1204            kind,
1205            tool_use.input.clone(),
1206        );
1207
1208        if let Some(output) = output.clone() {
1209            // For replay, we use a dummy cancellation receiver since the tool already completed
1210            let (_cancellation_tx, cancellation_rx) = watch::channel(false);
1211            let tool_event_stream = ToolCallEventStream::new(
1212                tool_use.id.clone(),
1213                stream.clone(),
1214                Some(self.project.read(cx).fs().clone()),
1215                cancellation_rx,
1216            );
1217            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
1218                .log_err();
1219        }
1220
1221        stream.update_tool_call_fields(
1222            &tool_use.id,
1223            acp::ToolCallUpdateFields::new()
1224                .status(status)
1225                .raw_output(output),
1226            None,
1227        );
1228    }
1229
1230    pub fn from_db(
1231        id: acp::SessionId,
1232        db_thread: DbThread,
1233        project: Entity<Project>,
1234        project_context: Entity<ProjectContext>,
1235        context_server_registry: Entity<ContextServerRegistry>,
1236        templates: Arc<Templates>,
1237        cx: &mut Context<Self>,
1238    ) -> Self {
1239        let settings = AgentSettings::get_global(cx);
1240        let profile_id = db_thread
1241            .profile
1242            .unwrap_or_else(|| settings.default_profile.clone());
1243
1244        let mut model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
1245            db_thread
1246                .model
1247                .and_then(|model| {
1248                    let model = SelectedModel {
1249                        provider: model.provider.clone().into(),
1250                        model: model.model.into(),
1251                    };
1252                    registry.select_model(&model, cx)
1253                })
1254                .or_else(|| registry.default_model())
1255                .map(|model| model.model)
1256        });
1257
1258        if model.is_none() {
1259            model = Self::resolve_profile_model(&profile_id, cx);
1260        }
1261        if model.is_none() {
1262            model = LanguageModelRegistry::global(cx).update(cx, |registry, _cx| {
1263                registry.default_model().map(|model| model.model)
1264            });
1265        }
1266
1267        let (prompt_capabilities_tx, prompt_capabilities_rx) =
1268            watch::channel(Self::prompt_capabilities(model.as_deref()));
1269
1270        let action_log = cx.new(|_| ActionLog::new(project.clone()));
1271        let worktree_roots: Vec<std::path::PathBuf> = project
1272            .read(cx)
1273            .visible_worktrees(cx)
1274            .map(|worktree| worktree.read(cx).abs_path().as_ref().to_path_buf())
1275            .collect();
1276        let available_skills = SkillsContext::new(worktree_roots, templates.clone(), cx);
1277
1278        Self {
1279            id,
1280            prompt_id: PromptId::new(),
1281            title: if db_thread.title.is_empty() {
1282                None
1283            } else {
1284                Some(db_thread.title.clone())
1285            },
1286            pending_title_generation: None,
1287            pending_summary_generation: None,
1288            summary: db_thread.detailed_summary,
1289            messages: db_thread.messages,
1290            user_store: project.read(cx).user_store(),
1291            running_turn: None,
1292            has_queued_message: false,
1293            pending_message: None,
1294            tools: BTreeMap::default(),
1295            request_token_usage: db_thread.request_token_usage.clone(),
1296            cumulative_token_usage: db_thread.cumulative_token_usage,
1297            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
1298            context_server_registry,
1299            profile_id,
1300            project_context,
1301            templates,
1302            available_skills,
1303            model,
1304            summarization_model: None,
1305            thinking_enabled: db_thread.thinking_enabled,
1306            thinking_effort: db_thread.thinking_effort,
1307            speed: db_thread.speed,
1308            project,
1309            action_log,
1310            updated_at: db_thread.updated_at,
1311            prompt_capabilities_tx,
1312            prompt_capabilities_rx,
1313            imported: db_thread.imported,
1314            subagent_context: db_thread.subagent_context,
1315            draft_prompt: db_thread.draft_prompt,
1316            ui_scroll_position: db_thread.ui_scroll_position.map(|sp| gpui::ListOffset {
1317                item_ix: sp.item_ix,
1318                offset_in_item: gpui::px(sp.offset_in_item),
1319            }),
1320            running_subagents: Vec::new(),
1321        }
1322    }
1323
1324    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
1325        let initial_project_snapshot = self.initial_project_snapshot.clone();
1326        let mut thread = DbThread {
1327            title: self.title().unwrap_or_default(),
1328            messages: self.messages.clone(),
1329            updated_at: self.updated_at,
1330            detailed_summary: self.summary.clone(),
1331            initial_project_snapshot: None,
1332            cumulative_token_usage: self.cumulative_token_usage,
1333            request_token_usage: self.request_token_usage.clone(),
1334            model: self.model.as_ref().map(|model| DbLanguageModel {
1335                provider: model.provider_id().to_string(),
1336                model: model.id().0.to_string(),
1337            }),
1338            profile: Some(self.profile_id.clone()),
1339            imported: self.imported,
1340            subagent_context: self.subagent_context.clone(),
1341            speed: self.speed,
1342            thinking_enabled: self.thinking_enabled,
1343            thinking_effort: self.thinking_effort.clone(),
1344            draft_prompt: self.draft_prompt.clone(),
1345            ui_scroll_position: self.ui_scroll_position.map(|lo| {
1346                crate::db::SerializedScrollPosition {
1347                    item_ix: lo.item_ix,
1348                    offset_in_item: lo.offset_in_item.as_f32(),
1349                }
1350            }),
1351        };
1352
1353        cx.background_spawn(async move {
1354            let initial_project_snapshot = initial_project_snapshot.await;
1355            thread.initial_project_snapshot = initial_project_snapshot;
1356            thread
1357        })
1358    }
1359
1360    /// Create a snapshot of the current project state including git information and unsaved buffers.
1361    fn project_snapshot(
1362        project: Entity<Project>,
1363        cx: &mut Context<Self>,
1364    ) -> Task<Arc<ProjectSnapshot>> {
1365        let task = project::telemetry_snapshot::TelemetrySnapshot::new(&project, cx);
1366        cx.spawn(async move |_, _| {
1367            let snapshot = task.await;
1368
1369            Arc::new(ProjectSnapshot {
1370                worktree_snapshots: snapshot.worktree_snapshots,
1371                timestamp: Utc::now(),
1372            })
1373        })
1374    }
1375
1376    pub fn project_context(&self) -> &Entity<ProjectContext> {
1377        &self.project_context
1378    }
1379
1380    pub fn project(&self) -> &Entity<Project> {
1381        &self.project
1382    }
1383
1384    pub fn action_log(&self) -> &Entity<ActionLog> {
1385        &self.action_log
1386    }
1387
1388    pub fn is_empty(&self) -> bool {
1389        self.messages.is_empty() && self.title.is_none()
1390    }
1391
1392    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1393        self.draft_prompt.as_deref()
1394    }
1395
1396    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1397        self.draft_prompt = prompt;
1398    }
1399
1400    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1401        self.ui_scroll_position
1402    }
1403
1404    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1405        self.ui_scroll_position = position;
1406    }
1407
1408    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
1409        self.model.as_ref()
1410    }
1411
1412    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
1413        let old_usage = self.latest_token_usage();
1414        self.model = Some(model.clone());
1415        let new_caps = Self::prompt_capabilities(self.model.as_deref());
1416        let new_usage = self.latest_token_usage();
1417        if old_usage != new_usage {
1418            cx.emit(TokenUsageUpdated(new_usage));
1419        }
1420        self.prompt_capabilities_tx.send(new_caps).log_err();
1421
1422        for subagent in &self.running_subagents {
1423            subagent
1424                .update(cx, |thread, cx| thread.set_model(model.clone(), cx))
1425                .ok();
1426        }
1427
1428        cx.notify()
1429    }
1430
1431    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
1432        self.summarization_model.as_ref()
1433    }
1434
1435    pub fn set_summarization_model(
1436        &mut self,
1437        model: Option<Arc<dyn LanguageModel>>,
1438        cx: &mut Context<Self>,
1439    ) {
1440        self.summarization_model = model.clone();
1441
1442        for subagent in &self.running_subagents {
1443            subagent
1444                .update(cx, |thread, cx| {
1445                    thread.set_summarization_model(model.clone(), cx)
1446                })
1447                .ok();
1448        }
1449        cx.notify()
1450    }
1451
1452    pub fn thinking_enabled(&self) -> bool {
1453        self.thinking_enabled
1454    }
1455
1456    pub fn set_thinking_enabled(&mut self, enabled: bool, cx: &mut Context<Self>) {
1457        self.thinking_enabled = enabled;
1458
1459        for subagent in &self.running_subagents {
1460            subagent
1461                .update(cx, |thread, cx| thread.set_thinking_enabled(enabled, cx))
1462                .ok();
1463        }
1464        cx.notify();
1465    }
1466
1467    pub fn thinking_effort(&self) -> Option<&String> {
1468        self.thinking_effort.as_ref()
1469    }
1470
1471    pub fn set_thinking_effort(&mut self, effort: Option<String>, cx: &mut Context<Self>) {
1472        self.thinking_effort = effort.clone();
1473
1474        for subagent in &self.running_subagents {
1475            subagent
1476                .update(cx, |thread, cx| {
1477                    thread.set_thinking_effort(effort.clone(), cx)
1478                })
1479                .ok();
1480        }
1481        cx.notify();
1482    }
1483
1484    pub fn speed(&self) -> Option<Speed> {
1485        self.speed
1486    }
1487
1488    pub fn set_speed(&mut self, speed: Speed, cx: &mut Context<Self>) {
1489        self.speed = Some(speed);
1490
1491        for subagent in &self.running_subagents {
1492            subagent
1493                .update(cx, |thread, cx| thread.set_speed(speed, cx))
1494                .ok();
1495        }
1496        cx.notify();
1497    }
1498
1499    pub fn last_message(&self) -> Option<&Message> {
1500        self.messages.last()
1501    }
1502
1503    #[cfg(any(test, feature = "test-support"))]
1504    pub fn last_received_or_pending_message(&self) -> Option<Message> {
1505        if let Some(message) = self.pending_message.clone() {
1506            Some(Message::Agent(message))
1507        } else {
1508            self.messages.last().cloned()
1509        }
1510    }
1511
1512    pub fn add_default_tools(
1513        &mut self,
1514        environment: Rc<dyn ThreadEnvironment>,
1515        cx: &mut Context<Self>,
1516    ) {
1517        // Only update the agent location for the root thread, not for subagents.
1518        let update_agent_location = self.parent_thread_id().is_none();
1519
1520        let language_registry = self.project.read(cx).languages().clone();
1521        self.add_tool(CopyPathTool::new(self.project.clone()));
1522        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
1523        self.add_tool(DeletePathTool::new(
1524            self.project.clone(),
1525            self.action_log.clone(),
1526        ));
1527        self.add_tool(DiagnosticsTool::new(self.project.clone()));
1528        self.add_tool(EditFileTool::new(
1529            self.project.clone(),
1530            cx.weak_entity(),
1531            language_registry.clone(),
1532            Templates::new(),
1533        ));
1534        self.add_tool(StreamingEditFileTool::new(
1535            self.project.clone(),
1536            cx.weak_entity(),
1537            self.action_log.clone(),
1538            language_registry,
1539        ));
1540        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
1541        self.add_tool(FindPathTool::new(self.project.clone()));
1542        self.add_tool(GrepTool::new(self.project.clone()));
1543        self.add_tool(ListDirectoryTool::new(self.project.clone()));
1544        self.add_tool(MovePathTool::new(self.project.clone()));
1545        self.add_tool(NowTool);
1546        self.add_tool(OpenTool::new(self.project.clone()));
1547        if cx.has_flag::<UpdatePlanToolFeatureFlag>() {
1548            self.add_tool(UpdatePlanTool);
1549        }
1550        self.add_tool(ReadFileTool::new(
1551            self.project.clone(),
1552            self.action_log.clone(),
1553            update_agent_location,
1554        ));
1555        self.add_tool(SaveFileTool::new(self.project.clone()));
1556        self.add_tool(RestoreFileFromDiskTool::new(self.project.clone()));
1557        self.add_tool(TerminalTool::new(self.project.clone(), environment.clone()));
1558        self.add_tool(WebSearchTool);
1559
1560        if self.depth() < MAX_SUBAGENT_DEPTH {
1561            self.add_tool(SpawnAgentTool::new(environment));
1562        }
1563    }
1564
1565    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
1566        debug_assert!(
1567            !self.tools.contains_key(T::NAME),
1568            "Duplicate tool name: {}",
1569            T::NAME,
1570        );
1571        self.tools.insert(T::NAME.into(), tool.erase());
1572    }
1573
1574    #[cfg(any(test, feature = "test-support"))]
1575    pub fn remove_tool(&mut self, name: &str) -> bool {
1576        self.tools.remove(name).is_some()
1577    }
1578
1579    pub fn profile(&self) -> &AgentProfileId {
1580        &self.profile_id
1581    }
1582
1583    pub fn set_profile(&mut self, profile_id: AgentProfileId, cx: &mut Context<Self>) {
1584        if self.profile_id == profile_id {
1585            return;
1586        }
1587
1588        self.profile_id = profile_id.clone();
1589
1590        // Swap to the profile's preferred model when available.
1591        if let Some(model) = Self::resolve_profile_model(&self.profile_id, cx) {
1592            self.set_model(model, cx);
1593        }
1594
1595        for subagent in &self.running_subagents {
1596            subagent
1597                .update(cx, |thread, cx| thread.set_profile(profile_id.clone(), cx))
1598                .ok();
1599        }
1600    }
1601
1602    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1603        for subagent in self.running_subagents.drain(..) {
1604            if let Some(subagent) = subagent.upgrade() {
1605                subagent.update(cx, |thread, cx| thread.cancel(cx)).detach();
1606            }
1607        }
1608
1609        let Some(running_turn) = self.running_turn.take() else {
1610            self.flush_pending_message(cx);
1611            return Task::ready(());
1612        };
1613
1614        let turn_task = running_turn.cancel();
1615
1616        cx.spawn(async move |this, cx| {
1617            turn_task.await;
1618            this.update(cx, |this, cx| {
1619                this.flush_pending_message(cx);
1620            })
1621            .ok();
1622        })
1623    }
1624
1625    pub fn set_has_queued_message(&mut self, has_queued: bool) {
1626        self.has_queued_message = has_queued;
1627    }
1628
1629    pub fn has_queued_message(&self) -> bool {
1630        self.has_queued_message
1631    }
1632
1633    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1634        let Some(last_user_message) = self.last_user_message() else {
1635            return;
1636        };
1637
1638        self.request_token_usage
1639            .insert(last_user_message.id.clone(), update);
1640        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1641        cx.notify();
1642    }
1643
1644    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1645        self.cancel(cx).detach();
1646        // Clear pending message since cancel will try to flush it asynchronously,
1647        // and we don't want that content to be added after we truncate
1648        self.pending_message.take();
1649        let Some(position) = self.messages.iter().position(
1650            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1651        ) else {
1652            return Err(anyhow!("Message not found"));
1653        };
1654
1655        for message in self.messages.drain(position..) {
1656            match message {
1657                Message::User(message) => {
1658                    self.request_token_usage.remove(&message.id);
1659                }
1660                Message::Agent(_) | Message::Resume => {}
1661            }
1662        }
1663        self.clear_summary();
1664        cx.notify();
1665        Ok(())
1666    }
1667
1668    pub fn latest_request_token_usage(&self) -> Option<language_model::TokenUsage> {
1669        let last_user_message = self.last_user_message()?;
1670        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1671        Some(*tokens)
1672    }
1673
1674    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1675        let usage = self.latest_request_token_usage()?;
1676        let model = self.model.clone()?;
1677        Some(acp_thread::TokenUsage {
1678            max_tokens: model.max_token_count(),
1679            max_output_tokens: model.max_output_tokens(),
1680            used_tokens: usage.total_tokens(),
1681            input_tokens: usage.input_tokens,
1682            output_tokens: usage.output_tokens,
1683        })
1684    }
1685
1686    /// Get the total input token count as of the message before the given message.
1687    ///
1688    /// Returns `None` if:
1689    /// - `target_id` is the first message (no previous message)
1690    /// - The previous message hasn't received a response yet (no usage data)
1691    /// - `target_id` is not found in the messages
1692    pub fn tokens_before_message(&self, target_id: &UserMessageId) -> Option<u64> {
1693        let mut previous_user_message_id: Option<&UserMessageId> = None;
1694
1695        for message in &self.messages {
1696            if let Message::User(user_msg) = message {
1697                if &user_msg.id == target_id {
1698                    let prev_id = previous_user_message_id?;
1699                    let usage = self.request_token_usage.get(prev_id)?;
1700                    return Some(usage.input_tokens);
1701                }
1702                previous_user_message_id = Some(&user_msg.id);
1703            }
1704        }
1705        None
1706    }
1707
1708    /// Look up the active profile and resolve its preferred model if one is configured.
1709    fn resolve_profile_model(
1710        profile_id: &AgentProfileId,
1711        cx: &mut Context<Self>,
1712    ) -> Option<Arc<dyn LanguageModel>> {
1713        let selection = AgentSettings::get_global(cx)
1714            .profiles
1715            .get(profile_id)?
1716            .default_model
1717            .clone()?;
1718        Self::resolve_model_from_selection(&selection, cx)
1719    }
1720
1721    /// Translate a stored model selection into the configured model from the registry.
1722    fn resolve_model_from_selection(
1723        selection: &LanguageModelSelection,
1724        cx: &mut Context<Self>,
1725    ) -> Option<Arc<dyn LanguageModel>> {
1726        let selected = SelectedModel {
1727            provider: LanguageModelProviderId::from(selection.provider.0.clone()),
1728            model: LanguageModelId::from(selection.model.clone()),
1729        };
1730        LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
1731            registry
1732                .select_model(&selected, cx)
1733                .map(|configured| configured.model)
1734        })
1735    }
1736
1737    pub fn resume(
1738        &mut self,
1739        cx: &mut Context<Self>,
1740    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1741        self.messages.push(Message::Resume);
1742        cx.notify();
1743
1744        log::debug!("Total messages in thread: {}", self.messages.len());
1745        self.run_turn(cx)
1746    }
1747
1748    /// Sending a message results in the model streaming a response, which could include tool calls.
1749    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1750    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1751    pub fn send<T>(
1752        &mut self,
1753        id: UserMessageId,
1754        content: impl IntoIterator<Item = T>,
1755        cx: &mut Context<Self>,
1756    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1757    where
1758        T: Into<UserMessageContent>,
1759    {
1760        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1761        log::debug!("Thread::send content: {:?}", content);
1762
1763        self.messages
1764            .push(Message::User(UserMessage { id, content }));
1765        cx.notify();
1766
1767        self.send_existing(cx)
1768    }
1769
1770    pub fn send_existing(
1771        &mut self,
1772        cx: &mut Context<Self>,
1773    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1774        let model = self.model().context("No language model configured")?;
1775
1776        log::info!("Thread::send called with model: {}", model.name().0);
1777        self.advance_prompt_id();
1778
1779        log::debug!("Total messages in thread: {}", self.messages.len());
1780        self.run_turn(cx)
1781    }
1782
1783    pub fn push_acp_user_block(
1784        &mut self,
1785        id: UserMessageId,
1786        blocks: impl IntoIterator<Item = acp::ContentBlock>,
1787        path_style: PathStyle,
1788        cx: &mut Context<Self>,
1789    ) {
1790        let content = blocks
1791            .into_iter()
1792            .map(|block| UserMessageContent::from_content_block(block, path_style))
1793            .collect::<Vec<_>>();
1794        self.messages
1795            .push(Message::User(UserMessage { id, content }));
1796        cx.notify();
1797    }
1798
1799    pub fn push_acp_agent_block(&mut self, block: acp::ContentBlock, cx: &mut Context<Self>) {
1800        let text = match block {
1801            acp::ContentBlock::Text(text_content) => text_content.text,
1802            acp::ContentBlock::Image(_) => "[image]".to_string(),
1803            acp::ContentBlock::Audio(_) => "[audio]".to_string(),
1804            acp::ContentBlock::ResourceLink(resource_link) => resource_link.uri,
1805            acp::ContentBlock::Resource(resource) => match resource.resource {
1806                acp::EmbeddedResourceResource::TextResourceContents(resource) => resource.uri,
1807                acp::EmbeddedResourceResource::BlobResourceContents(resource) => resource.uri,
1808                _ => "[resource]".to_string(),
1809            },
1810            _ => "[unknown]".to_string(),
1811        };
1812
1813        self.messages.push(Message::Agent(AgentMessage {
1814            content: vec![AgentMessageContent::Text(text)],
1815            ..Default::default()
1816        }));
1817        cx.notify();
1818    }
1819
1820    fn run_turn(
1821        &mut self,
1822        cx: &mut Context<Self>,
1823    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1824        // Flush the old pending message synchronously before cancelling,
1825        // to avoid a race where the detached cancel task might flush the NEW
1826        // turn's pending message instead of the old one.
1827        self.flush_pending_message(cx);
1828        self.cancel(cx).detach();
1829
1830        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1831        let event_stream = ThreadEventStream(events_tx);
1832        let message_ix = self.messages.len().saturating_sub(1);
1833        self.clear_summary();
1834        let (cancellation_tx, mut cancellation_rx) = watch::channel(false);
1835        self.running_turn = Some(RunningTurn {
1836            event_stream: event_stream.clone(),
1837            tools: self.enabled_tools(cx),
1838            cancellation_tx,
1839            streaming_tool_inputs: HashMap::default(),
1840            _task: cx.spawn(async move |this, cx| {
1841                log::debug!("Starting agent turn execution");
1842
1843                let turn_result =
1844                    Self::run_turn_internal(&this, &event_stream, cancellation_rx.clone(), cx)
1845                        .await;
1846
1847                // Check if we were cancelled - if so, cancel() already took running_turn
1848                // and we shouldn't touch it (it might be a NEW turn now)
1849                let was_cancelled = *cancellation_rx.borrow();
1850                if was_cancelled {
1851                    log::debug!("Turn was cancelled, skipping cleanup");
1852                    return;
1853                }
1854
1855                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1856
1857                match turn_result {
1858                    Ok(()) => {
1859                        log::debug!("Turn execution completed");
1860                        event_stream.send_stop(acp::StopReason::EndTurn);
1861                    }
1862                    Err(error) => {
1863                        log::error!("Turn execution failed: {:?}", error);
1864                        match error.downcast::<CompletionError>() {
1865                            Ok(CompletionError::Refusal) => {
1866                                event_stream.send_stop(acp::StopReason::Refusal);
1867                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1868                            }
1869                            Ok(CompletionError::MaxTokens) => {
1870                                event_stream.send_stop(acp::StopReason::MaxTokens);
1871                            }
1872                            Ok(CompletionError::Other(error)) | Err(error) => {
1873                                event_stream.send_error(error);
1874                            }
1875                        }
1876                    }
1877                }
1878
1879                _ = this.update(cx, |this, _| this.running_turn.take());
1880            }),
1881        });
1882        Ok(events_rx)
1883    }
1884
1885    async fn run_turn_internal(
1886        this: &WeakEntity<Self>,
1887        event_stream: &ThreadEventStream,
1888        mut cancellation_rx: watch::Receiver<bool>,
1889        cx: &mut AsyncApp,
1890    ) -> Result<()> {
1891        let mut attempt = 0;
1892        let mut intent = CompletionIntent::UserPrompt;
1893        loop {
1894            // Re-read the model and refresh tools on each iteration so that
1895            // mid-turn changes (e.g. the user switches model, toggles tools,
1896            // or changes profile) take effect between tool-call rounds.
1897            let (model, request) = this.update(cx, |this, cx| {
1898                let model = this.model.clone().context("No language model configured")?;
1899                this.refresh_turn_tools(cx);
1900                let request = this.build_completion_request(intent, cx)?;
1901                anyhow::Ok((model, request))
1902            })??;
1903
1904            telemetry::event!(
1905                "Agent Thread Completion",
1906                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1907                parent_thread_id = this.read_with(cx, |this, _| this
1908                    .parent_thread_id()
1909                    .map(|id| id.to_string()))?,
1910                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1911                model = model.telemetry_id(),
1912                model_provider = model.provider_id().to_string(),
1913                attempt
1914            );
1915
1916            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1917
1918            let (mut events, mut error) = match model.stream_completion(request, cx).await {
1919                Ok(events) => (events.fuse(), None),
1920                Err(err) => (stream::empty().boxed().fuse(), Some(err)),
1921            };
1922            let mut tool_results: FuturesUnordered<Task<LanguageModelToolResult>> =
1923                FuturesUnordered::new();
1924            let mut early_tool_results: Vec<LanguageModelToolResult> = Vec::new();
1925            let mut cancelled = false;
1926            loop {
1927                // Race between getting the first event, tool completion, and cancellation.
1928                let first_event = futures::select! {
1929                    event = events.next().fuse() => event,
1930                    tool_result = futures::StreamExt::select_next_some(&mut tool_results) => {
1931                        let is_error = tool_result.is_error;
1932                        let is_still_streaming = this
1933                            .read_with(cx, |this, _cx| {
1934                                this.running_turn
1935                                    .as_ref()
1936                                    .and_then(|turn| turn.streaming_tool_inputs.get(&tool_result.tool_use_id))
1937                                    .map_or(false, |inputs| !inputs.has_received_final())
1938                            })
1939                            .unwrap_or(false);
1940
1941                        early_tool_results.push(tool_result);
1942
1943                        // Only break if the tool errored and we are still
1944                        // streaming the input of the tool. If the tool errored
1945                        // but we are no longer streaming its input (i.e. there
1946                        // are parallel tool calls) we want to continue
1947                        // processing those tool inputs.
1948                        if is_error && is_still_streaming {
1949                            break;
1950                        }
1951                        continue;
1952                    }
1953                    _ = cancellation_rx.changed().fuse() => {
1954                        if *cancellation_rx.borrow() {
1955                            cancelled = true;
1956                            break;
1957                        }
1958                        continue;
1959                    }
1960                };
1961                let Some(first_event) = first_event else {
1962                    break;
1963                };
1964
1965                // Collect all immediately available events to process as a batch
1966                let mut batch = vec![first_event];
1967                while let Some(event) = events.next().now_or_never().flatten() {
1968                    batch.push(event);
1969                }
1970
1971                // Process the batch in a single update
1972                let batch_result = this.update(cx, |this, cx| {
1973                    let mut batch_tool_results = Vec::new();
1974                    let mut batch_error = None;
1975
1976                    for event in batch {
1977                        log::trace!("Received completion event: {:?}", event);
1978                        match event {
1979                            Ok(event) => {
1980                                match this.handle_completion_event(
1981                                    event,
1982                                    event_stream,
1983                                    cancellation_rx.clone(),
1984                                    cx,
1985                                ) {
1986                                    Ok(Some(task)) => batch_tool_results.push(task),
1987                                    Ok(None) => {}
1988                                    Err(err) => {
1989                                        batch_error = Some(err);
1990                                        break;
1991                                    }
1992                                }
1993                            }
1994                            Err(err) => {
1995                                batch_error = Some(err.into());
1996                                break;
1997                            }
1998                        }
1999                    }
2000
2001                    cx.notify();
2002                    (batch_tool_results, batch_error)
2003                })?;
2004
2005                tool_results.extend(batch_result.0);
2006                if let Some(err) = batch_result.1 {
2007                    error = Some(err.downcast()?);
2008                    break;
2009                }
2010            }
2011
2012            // Drop the stream to release the rate limit permit before tool execution.
2013            // The stream holds a semaphore guard that limits concurrent requests.
2014            // Without this, the permit would be held during potentially long-running
2015            // tool execution, which could cause deadlocks when tools spawn subagents
2016            // that need their own permits.
2017            drop(events);
2018
2019            // Drop streaming tool input senders that never received their final input.
2020            // This prevents deadlock when the LLM stream ends (e.g. because of an error)
2021            // before sending a tool use with `is_input_complete: true`.
2022            this.update(cx, |this, _cx| {
2023                if let Some(running_turn) = this.running_turn.as_mut() {
2024                    if running_turn.streaming_tool_inputs.is_empty() {
2025                        return;
2026                    }
2027                    log::warn!("Dropping partial tool inputs because the stream ended");
2028                    running_turn.streaming_tool_inputs.drain();
2029                }
2030            })?;
2031
2032            let end_turn = tool_results.is_empty() && early_tool_results.is_empty();
2033
2034            for tool_result in early_tool_results {
2035                Self::process_tool_result(this, event_stream, cx, tool_result)?;
2036            }
2037            while let Some(tool_result) = tool_results.next().await {
2038                Self::process_tool_result(this, event_stream, cx, tool_result)?;
2039            }
2040
2041            this.update(cx, |this, cx| {
2042                this.flush_pending_message(cx);
2043                if this.title.is_none() && this.pending_title_generation.is_none() {
2044                    this.generate_title(cx);
2045                }
2046            })?;
2047
2048            if cancelled {
2049                log::debug!("Turn cancelled by user, exiting");
2050                return Ok(());
2051            }
2052
2053            if let Some(error) = error {
2054                attempt += 1;
2055                let retry = this.update(cx, |this, cx| {
2056                    let user_store = this.user_store.read(cx);
2057                    this.handle_completion_error(error, attempt, user_store.plan())
2058                })??;
2059                let timer = cx.background_executor().timer(retry.duration);
2060                event_stream.send_retry(retry);
2061                futures::select! {
2062                    _ = timer.fuse() => {}
2063                    _ = cancellation_rx.changed().fuse() => {
2064                        if *cancellation_rx.borrow() {
2065                            log::debug!("Turn cancelled during retry delay, exiting");
2066                            return Ok(());
2067                        }
2068                    }
2069                }
2070                this.update(cx, |this, _cx| {
2071                    if let Some(Message::Agent(message)) = this.messages.last() {
2072                        if message.tool_results.is_empty() {
2073                            intent = CompletionIntent::UserPrompt;
2074                            this.messages.push(Message::Resume);
2075                        }
2076                    }
2077                })?;
2078            } else if end_turn {
2079                return Ok(());
2080            } else {
2081                let has_queued = this.update(cx, |this, _| this.has_queued_message())?;
2082                if has_queued {
2083                    log::debug!("Queued message found, ending turn at message boundary");
2084                    return Ok(());
2085                }
2086                intent = CompletionIntent::ToolResults;
2087                attempt = 0;
2088            }
2089        }
2090    }
2091
2092    fn process_tool_result(
2093        this: &WeakEntity<Thread>,
2094        event_stream: &ThreadEventStream,
2095        cx: &mut AsyncApp,
2096        tool_result: LanguageModelToolResult,
2097    ) -> Result<(), anyhow::Error> {
2098        log::debug!("Tool finished {:?}", tool_result);
2099
2100        event_stream.update_tool_call_fields(
2101            &tool_result.tool_use_id,
2102            acp::ToolCallUpdateFields::new()
2103                .status(if tool_result.is_error {
2104                    acp::ToolCallStatus::Failed
2105                } else {
2106                    acp::ToolCallStatus::Completed
2107                })
2108                .raw_output(tool_result.output.clone()),
2109            None,
2110        );
2111        this.update(cx, |this, _cx| {
2112            this.pending_message()
2113                .tool_results
2114                .insert(tool_result.tool_use_id.clone(), tool_result);
2115        })?;
2116        Ok(())
2117    }
2118
2119    fn handle_completion_error(
2120        &mut self,
2121        error: LanguageModelCompletionError,
2122        attempt: u8,
2123        plan: Option<Plan>,
2124    ) -> Result<acp_thread::RetryStatus> {
2125        let Some(model) = self.model.as_ref() else {
2126            return Err(anyhow!(error));
2127        };
2128
2129        let auto_retry = if model.provider_id() == ZED_CLOUD_PROVIDER_ID {
2130            plan.is_some()
2131        } else {
2132            true
2133        };
2134
2135        if !auto_retry {
2136            return Err(anyhow!(error));
2137        }
2138
2139        let Some(strategy) = Self::retry_strategy_for(&error) else {
2140            return Err(anyhow!(error));
2141        };
2142
2143        let max_attempts = match &strategy {
2144            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
2145            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
2146        };
2147
2148        if attempt > max_attempts {
2149            return Err(anyhow!(error));
2150        }
2151
2152        let delay = match &strategy {
2153            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
2154                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
2155                Duration::from_secs(delay_secs)
2156            }
2157            RetryStrategy::Fixed { delay, .. } => *delay,
2158        };
2159        log::debug!("Retry attempt {attempt} with delay {delay:?}");
2160
2161        Ok(acp_thread::RetryStatus {
2162            last_error: error.to_string().into(),
2163            attempt: attempt as usize,
2164            max_attempts: max_attempts as usize,
2165            started_at: Instant::now(),
2166            duration: delay,
2167        })
2168    }
2169
2170    /// A helper method that's called on every streamed completion event.
2171    /// Returns an optional tool result task, which the main agentic loop will
2172    /// send back to the model when it resolves.
2173    fn handle_completion_event(
2174        &mut self,
2175        event: LanguageModelCompletionEvent,
2176        event_stream: &ThreadEventStream,
2177        cancellation_rx: watch::Receiver<bool>,
2178        cx: &mut Context<Self>,
2179    ) -> Result<Option<Task<LanguageModelToolResult>>> {
2180        log::trace!("Handling streamed completion event: {:?}", event);
2181        use LanguageModelCompletionEvent::*;
2182
2183        match event {
2184            StartMessage { .. } => {
2185                self.flush_pending_message(cx);
2186                self.pending_message = Some(AgentMessage::default());
2187            }
2188            Text(new_text) => self.handle_text_event(new_text, event_stream),
2189            Thinking { text, signature } => {
2190                self.handle_thinking_event(text, signature, event_stream)
2191            }
2192            RedactedThinking { data } => self.handle_redacted_thinking_event(data),
2193            ReasoningDetails(details) => {
2194                let last_message = self.pending_message();
2195                // Store the last non-empty reasoning_details (overwrites earlier ones)
2196                // This ensures we keep the encrypted reasoning with signatures, not the early text reasoning
2197                if let serde_json::Value::Array(ref arr) = details {
2198                    if !arr.is_empty() {
2199                        last_message.reasoning_details = Some(details);
2200                    }
2201                } else {
2202                    last_message.reasoning_details = Some(details);
2203                }
2204            }
2205            ToolUse(tool_use) => {
2206                return Ok(self.handle_tool_use_event(tool_use, event_stream, cancellation_rx, cx));
2207            }
2208            ToolUseJsonParseError {
2209                id,
2210                tool_name,
2211                raw_input,
2212                json_parse_error,
2213            } => {
2214                return Ok(Some(Task::ready(
2215                    self.handle_tool_use_json_parse_error_event(
2216                        id,
2217                        tool_name,
2218                        raw_input,
2219                        json_parse_error,
2220                        event_stream,
2221                    ),
2222                )));
2223            }
2224            UsageUpdate(usage) => {
2225                telemetry::event!(
2226                    "Agent Thread Completion Usage Updated",
2227                    thread_id = self.id.to_string(),
2228                    parent_thread_id = self.parent_thread_id().map(|id| id.to_string()),
2229                    prompt_id = self.prompt_id.to_string(),
2230                    model = self.model.as_ref().map(|m| m.telemetry_id()),
2231                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
2232                    input_tokens = usage.input_tokens,
2233                    output_tokens = usage.output_tokens,
2234                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
2235                    cache_read_input_tokens = usage.cache_read_input_tokens,
2236                );
2237                self.update_token_usage(usage, cx);
2238            }
2239            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
2240            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
2241            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
2242            Started | Queued { .. } => {}
2243        }
2244
2245        Ok(None)
2246    }
2247
2248    fn handle_text_event(&mut self, new_text: String, event_stream: &ThreadEventStream) {
2249        event_stream.send_text(&new_text);
2250
2251        let last_message = self.pending_message();
2252        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
2253            text.push_str(&new_text);
2254        } else {
2255            last_message
2256                .content
2257                .push(AgentMessageContent::Text(new_text));
2258        }
2259    }
2260
2261    fn handle_thinking_event(
2262        &mut self,
2263        new_text: String,
2264        new_signature: Option<String>,
2265        event_stream: &ThreadEventStream,
2266    ) {
2267        event_stream.send_thinking(&new_text);
2268
2269        let last_message = self.pending_message();
2270        if let Some(AgentMessageContent::Thinking { text, signature }) =
2271            last_message.content.last_mut()
2272        {
2273            text.push_str(&new_text);
2274            *signature = new_signature.or(signature.take());
2275        } else {
2276            last_message.content.push(AgentMessageContent::Thinking {
2277                text: new_text,
2278                signature: new_signature,
2279            });
2280        }
2281    }
2282
2283    fn handle_redacted_thinking_event(&mut self, data: String) {
2284        let last_message = self.pending_message();
2285        last_message
2286            .content
2287            .push(AgentMessageContent::RedactedThinking(data));
2288    }
2289
2290    fn handle_tool_use_event(
2291        &mut self,
2292        tool_use: LanguageModelToolUse,
2293        event_stream: &ThreadEventStream,
2294        cancellation_rx: watch::Receiver<bool>,
2295        cx: &mut Context<Self>,
2296    ) -> Option<Task<LanguageModelToolResult>> {
2297        cx.notify();
2298
2299        let tool = self.tool(tool_use.name.as_ref());
2300        let mut title = SharedString::from(&tool_use.name);
2301        let mut kind = acp::ToolKind::Other;
2302        if let Some(tool) = tool.as_ref() {
2303            title = tool.initial_title(tool_use.input.clone(), cx);
2304            kind = tool.kind();
2305        }
2306
2307        self.send_or_update_tool_use(&tool_use, title, kind, event_stream);
2308
2309        let Some(tool) = tool else {
2310            let content = format!("No tool named {} exists", tool_use.name);
2311            return Some(Task::ready(LanguageModelToolResult {
2312                content: LanguageModelToolResultContent::Text(Arc::from(content)),
2313                tool_use_id: tool_use.id,
2314                tool_name: tool_use.name,
2315                is_error: true,
2316                output: None,
2317            }));
2318        };
2319
2320        if !tool_use.is_input_complete {
2321            if tool.supports_input_streaming() {
2322                let running_turn = self.running_turn.as_mut()?;
2323                if let Some(sender) = running_turn.streaming_tool_inputs.get(&tool_use.id) {
2324                    sender.send_partial(tool_use.input);
2325                    return None;
2326                }
2327
2328                let (sender, tool_input) = ToolInputSender::channel();
2329                sender.send_partial(tool_use.input);
2330                running_turn
2331                    .streaming_tool_inputs
2332                    .insert(tool_use.id.clone(), sender);
2333
2334                let tool = tool.clone();
2335                log::debug!("Running streaming tool {}", tool_use.name);
2336                return Some(self.run_tool(
2337                    tool,
2338                    tool_input,
2339                    tool_use.id,
2340                    tool_use.name,
2341                    event_stream,
2342                    cancellation_rx,
2343                    cx,
2344                ));
2345            } else {
2346                return None;
2347            }
2348        }
2349
2350        if let Some(sender) = self
2351            .running_turn
2352            .as_mut()?
2353            .streaming_tool_inputs
2354            .remove(&tool_use.id)
2355        {
2356            sender.send_final(tool_use.input);
2357            return None;
2358        }
2359
2360        log::debug!("Running tool {}", tool_use.name);
2361        let tool_input = ToolInput::ready(tool_use.input);
2362        Some(self.run_tool(
2363            tool,
2364            tool_input,
2365            tool_use.id,
2366            tool_use.name,
2367            event_stream,
2368            cancellation_rx,
2369            cx,
2370        ))
2371    }
2372
2373    fn run_tool(
2374        &self,
2375        tool: Arc<dyn AnyAgentTool>,
2376        tool_input: ToolInput<serde_json::Value>,
2377        tool_use_id: LanguageModelToolUseId,
2378        tool_name: Arc<str>,
2379        event_stream: &ThreadEventStream,
2380        cancellation_rx: watch::Receiver<bool>,
2381        cx: &mut Context<Self>,
2382    ) -> Task<LanguageModelToolResult> {
2383        let fs = self.project.read(cx).fs().clone();
2384        let tool_event_stream = ToolCallEventStream::new(
2385            tool_use_id.clone(),
2386            event_stream.clone(),
2387            Some(fs),
2388            cancellation_rx,
2389        );
2390        tool_event_stream.update_fields(
2391            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress),
2392        );
2393        let supports_images = self.model().is_some_and(|model| model.supports_images());
2394        let tool_result = tool.run(tool_input, tool_event_stream, cx);
2395        cx.foreground_executor().spawn(async move {
2396            let (is_error, output) = match tool_result.await {
2397                Ok(mut output) => {
2398                    if let LanguageModelToolResultContent::Image(_) = &output.llm_output
2399                        && !supports_images
2400                    {
2401                        output = AgentToolOutput::from_error(
2402                            "Attempted to read an image, but this model doesn't support it.",
2403                        );
2404                        (true, output)
2405                    } else {
2406                        (false, output)
2407                    }
2408                }
2409                Err(output) => (true, output),
2410            };
2411
2412            LanguageModelToolResult {
2413                tool_use_id,
2414                tool_name,
2415                is_error,
2416                content: output.llm_output,
2417                output: Some(output.raw_output),
2418            }
2419        })
2420    }
2421
2422    fn handle_tool_use_json_parse_error_event(
2423        &mut self,
2424        tool_use_id: LanguageModelToolUseId,
2425        tool_name: Arc<str>,
2426        raw_input: Arc<str>,
2427        json_parse_error: String,
2428        event_stream: &ThreadEventStream,
2429    ) -> LanguageModelToolResult {
2430        let tool_use = LanguageModelToolUse {
2431            id: tool_use_id.clone(),
2432            name: tool_name.clone(),
2433            raw_input: raw_input.to_string(),
2434            input: serde_json::json!({}),
2435            is_input_complete: true,
2436            thought_signature: None,
2437        };
2438        self.send_or_update_tool_use(
2439            &tool_use,
2440            SharedString::from(&tool_use.name),
2441            acp::ToolKind::Other,
2442            event_stream,
2443        );
2444
2445        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
2446        LanguageModelToolResult {
2447            tool_use_id,
2448            tool_name,
2449            is_error: true,
2450            content: LanguageModelToolResultContent::Text(tool_output.into()),
2451            output: Some(serde_json::Value::String(raw_input.to_string())),
2452        }
2453    }
2454
2455    fn send_or_update_tool_use(
2456        &mut self,
2457        tool_use: &LanguageModelToolUse,
2458        title: SharedString,
2459        kind: acp::ToolKind,
2460        event_stream: &ThreadEventStream,
2461    ) {
2462        // Ensure the last message ends in the current tool use
2463        let last_message = self.pending_message();
2464
2465        let has_tool_use = last_message.content.iter_mut().rev().any(|content| {
2466            if let AgentMessageContent::ToolUse(last_tool_use) = content {
2467                if last_tool_use.id == tool_use.id {
2468                    *last_tool_use = tool_use.clone();
2469                    return true;
2470                }
2471            }
2472            false
2473        });
2474
2475        if !has_tool_use {
2476            event_stream.send_tool_call(
2477                &tool_use.id,
2478                &tool_use.name,
2479                title,
2480                kind,
2481                tool_use.input.clone(),
2482            );
2483            last_message
2484                .content
2485                .push(AgentMessageContent::ToolUse(tool_use.clone()));
2486        } else {
2487            event_stream.update_tool_call_fields(
2488                &tool_use.id,
2489                acp::ToolCallUpdateFields::new()
2490                    .title(title.as_str())
2491                    .kind(kind)
2492                    .raw_input(tool_use.input.clone()),
2493                None,
2494            );
2495        }
2496    }
2497
2498    pub fn title(&self) -> Option<SharedString> {
2499        self.title.clone()
2500    }
2501
2502    pub fn is_generating_summary(&self) -> bool {
2503        self.pending_summary_generation.is_some()
2504    }
2505
2506    pub fn is_generating_title(&self) -> bool {
2507        self.pending_title_generation.is_some()
2508    }
2509
2510    pub fn summary(&mut self, cx: &mut Context<Self>) -> Shared<Task<Option<SharedString>>> {
2511        if let Some(summary) = self.summary.as_ref() {
2512            return Task::ready(Some(summary.clone())).shared();
2513        }
2514        if let Some(task) = self.pending_summary_generation.clone() {
2515            return task;
2516        }
2517        let Some(model) = self.summarization_model.clone() else {
2518            log::error!("No summarization model available");
2519            return Task::ready(None).shared();
2520        };
2521        let mut request = LanguageModelRequest {
2522            intent: Some(CompletionIntent::ThreadContextSummarization),
2523            temperature: AgentSettings::temperature_for_model(&model, cx),
2524            ..Default::default()
2525        };
2526
2527        for message in &self.messages {
2528            request.messages.extend(message.to_request());
2529        }
2530
2531        request.messages.push(LanguageModelRequestMessage {
2532            role: Role::User,
2533            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
2534            cache: false,
2535            reasoning_details: None,
2536        });
2537
2538        let task = cx
2539            .spawn(async move |this, cx| {
2540                let mut summary = String::new();
2541                let mut messages = model.stream_completion(request, cx).await.log_err()?;
2542                while let Some(event) = messages.next().await {
2543                    let event = event.log_err()?;
2544                    let text = match event {
2545                        LanguageModelCompletionEvent::Text(text) => text,
2546                        _ => continue,
2547                    };
2548
2549                    let mut lines = text.lines();
2550                    summary.extend(lines.next());
2551                }
2552
2553                log::debug!("Setting summary: {}", summary);
2554                let summary = SharedString::from(summary);
2555
2556                this.update(cx, |this, cx| {
2557                    this.summary = Some(summary.clone());
2558                    this.pending_summary_generation = None;
2559                    cx.notify()
2560                })
2561                .ok()?;
2562
2563                Some(summary)
2564            })
2565            .shared();
2566        self.pending_summary_generation = Some(task.clone());
2567        task
2568    }
2569
2570    pub fn generate_title(&mut self, cx: &mut Context<Self>) {
2571        let Some(model) = self.summarization_model.clone() else {
2572            return;
2573        };
2574
2575        log::debug!(
2576            "Generating title with model: {:?}",
2577            self.summarization_model.as_ref().map(|model| model.name())
2578        );
2579        let mut request = LanguageModelRequest {
2580            intent: Some(CompletionIntent::ThreadSummarization),
2581            temperature: AgentSettings::temperature_for_model(&model, cx),
2582            ..Default::default()
2583        };
2584
2585        for message in &self.messages {
2586            request.messages.extend(message.to_request());
2587        }
2588
2589        request.messages.push(LanguageModelRequestMessage {
2590            role: Role::User,
2591            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
2592            cache: false,
2593            reasoning_details: None,
2594        });
2595        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
2596            let mut title = String::new();
2597
2598            let generate = async {
2599                let mut messages = model.stream_completion(request, cx).await?;
2600                while let Some(event) = messages.next().await {
2601                    let event = event?;
2602                    let text = match event {
2603                        LanguageModelCompletionEvent::Text(text) => text,
2604                        _ => continue,
2605                    };
2606
2607                    let mut lines = text.lines();
2608                    title.extend(lines.next());
2609
2610                    // Stop if the LLM generated multiple lines.
2611                    if lines.next().is_some() {
2612                        break;
2613                    }
2614                }
2615                anyhow::Ok(())
2616            };
2617
2618            if generate
2619                .await
2620                .context("failed to generate thread title")
2621                .log_err()
2622                .is_some()
2623            {
2624                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
2625            } else {
2626                // Emit TitleUpdated even on failure so that the propagation
2627                // chain (agent::Thread → NativeAgent → AcpThread) fires and
2628                // clears any provisional title that was set before the turn.
2629                _ = this.update(cx, |_, cx| {
2630                    cx.emit(TitleUpdated);
2631                    cx.notify();
2632                });
2633            }
2634            _ = this.update(cx, |this, _| this.pending_title_generation = None);
2635        }));
2636    }
2637
2638    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
2639        self.pending_title_generation = None;
2640        if Some(&title) != self.title.as_ref() {
2641            self.title = Some(title);
2642            cx.emit(TitleUpdated);
2643            cx.notify();
2644        }
2645    }
2646
2647    fn clear_summary(&mut self) {
2648        self.summary = None;
2649        self.pending_summary_generation = None;
2650    }
2651
2652    fn last_user_message(&self) -> Option<&UserMessage> {
2653        self.messages
2654            .iter()
2655            .rev()
2656            .find_map(|message| match message {
2657                Message::User(user_message) => Some(user_message),
2658                Message::Agent(_) => None,
2659                Message::Resume => None,
2660            })
2661    }
2662
2663    fn pending_message(&mut self) -> &mut AgentMessage {
2664        self.pending_message.get_or_insert_default()
2665    }
2666
2667    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
2668        let Some(mut message) = self.pending_message.take() else {
2669            return;
2670        };
2671
2672        if message.content.is_empty() {
2673            return;
2674        }
2675
2676        for content in &message.content {
2677            let AgentMessageContent::ToolUse(tool_use) = content else {
2678                continue;
2679            };
2680
2681            if !message.tool_results.contains_key(&tool_use.id) {
2682                message.tool_results.insert(
2683                    tool_use.id.clone(),
2684                    LanguageModelToolResult {
2685                        tool_use_id: tool_use.id.clone(),
2686                        tool_name: tool_use.name.clone(),
2687                        is_error: true,
2688                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
2689                        output: None,
2690                    },
2691                );
2692            }
2693        }
2694
2695        self.messages.push(Message::Agent(message));
2696        self.updated_at = Utc::now();
2697        self.clear_summary();
2698        cx.notify()
2699    }
2700
2701    pub(crate) fn build_completion_request(
2702        &self,
2703        completion_intent: CompletionIntent,
2704        cx: &App,
2705    ) -> Result<LanguageModelRequest> {
2706        let completion_intent =
2707            if self.is_subagent() && completion_intent == CompletionIntent::UserPrompt {
2708                CompletionIntent::Subagent
2709            } else {
2710                completion_intent
2711            };
2712
2713        let model = self.model().context("No language model configured")?;
2714        let tools = if let Some(turn) = self.running_turn.as_ref() {
2715            turn.tools
2716                .iter()
2717                .filter_map(|(tool_name, tool)| {
2718                    log::trace!("Including tool: {}", tool_name);
2719                    Some(LanguageModelRequestTool {
2720                        name: tool_name.to_string(),
2721                        description: tool.description().to_string(),
2722                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
2723                        use_input_streaming: tool.supports_input_streaming(),
2724                    })
2725                })
2726                .collect::<Vec<_>>()
2727        } else {
2728            Vec::new()
2729        };
2730
2731        log::debug!("Building completion request");
2732        log::debug!("Completion intent: {:?}", completion_intent);
2733
2734        let available_tools: Vec<_> = self
2735            .running_turn
2736            .as_ref()
2737            .map(|turn| turn.tools.keys().cloned().collect())
2738            .unwrap_or_default();
2739
2740        log::debug!("Request includes {} tools", available_tools.len());
2741        let messages = self.build_request_messages(available_tools, cx);
2742        log::debug!("Request will include {} messages", messages.len());
2743
2744        let request = LanguageModelRequest {
2745            thread_id: Some(self.id.to_string()),
2746            prompt_id: Some(self.prompt_id.to_string()),
2747            intent: Some(completion_intent),
2748            messages,
2749            tools,
2750            tool_choice: None,
2751            stop: Vec::new(),
2752            temperature: AgentSettings::temperature_for_model(model, cx),
2753            thinking_allowed: self.thinking_enabled,
2754            thinking_effort: self.thinking_effort.clone(),
2755            speed: self.speed(),
2756        };
2757
2758        log::debug!("Completion request built successfully");
2759        Ok(request)
2760    }
2761
2762    fn enabled_tools(&self, cx: &App) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
2763        let Some(model) = self.model.as_ref() else {
2764            return BTreeMap::new();
2765        };
2766        let Some(profile) = AgentSettings::get_global(cx).profiles.get(&self.profile_id) else {
2767            return BTreeMap::new();
2768        };
2769        fn truncate(tool_name: &SharedString) -> SharedString {
2770            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
2771                let mut truncated = tool_name.to_string();
2772                truncated.truncate(MAX_TOOL_NAME_LENGTH);
2773                truncated.into()
2774            } else {
2775                tool_name.clone()
2776            }
2777        }
2778
2779        let use_streaming_edit_tool =
2780            cx.has_flag::<StreamingEditFileToolFeatureFlag>() && model.supports_streaming_tools();
2781
2782        let mut tools = self
2783            .tools
2784            .iter()
2785            .filter_map(|(tool_name, tool)| {
2786                // For streaming_edit_file, check profile against "edit_file" since that's what users configure
2787                let profile_tool_name = if tool_name == StreamingEditFileTool::NAME {
2788                    EditFileTool::NAME
2789                } else {
2790                    tool_name.as_ref()
2791                };
2792
2793                if tool.supports_provider(&model.provider_id())
2794                    && profile.is_tool_enabled(profile_tool_name)
2795                {
2796                    match (tool_name.as_ref(), use_streaming_edit_tool) {
2797                        (StreamingEditFileTool::NAME, false) | (EditFileTool::NAME, true) => None,
2798                        (StreamingEditFileTool::NAME, true) => {
2799                            // Expose streaming tool as "edit_file"
2800                            Some((SharedString::from(EditFileTool::NAME), tool.clone()))
2801                        }
2802                        _ => Some((truncate(tool_name), tool.clone())),
2803                    }
2804                } else {
2805                    None
2806                }
2807            })
2808            .collect::<BTreeMap<_, _>>();
2809
2810        let mut context_server_tools = Vec::new();
2811        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
2812        let mut duplicate_tool_names = HashSet::default();
2813        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
2814            for (tool_name, tool) in server_tools {
2815                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
2816                    let tool_name = truncate(tool_name);
2817                    if !seen_tools.insert(tool_name.clone()) {
2818                        duplicate_tool_names.insert(tool_name.clone());
2819                    }
2820                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
2821                }
2822            }
2823        }
2824
2825        // When there are duplicate tool names, disambiguate by prefixing them
2826        // with the server ID (converted to snake_case for API compatibility).
2827        // In the rare case there isn't enough space for the disambiguated tool
2828        // name, keep only the last tool with this name.
2829        for (server_id, tool_name, tool) in context_server_tools {
2830            if duplicate_tool_names.contains(&tool_name) {
2831                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
2832                if available >= 2 {
2833                    let mut disambiguated = server_id.0.to_snake_case();
2834                    disambiguated.truncate(available - 1);
2835                    disambiguated.push('_');
2836                    disambiguated.push_str(&tool_name);
2837                    tools.insert(disambiguated.into(), tool.clone());
2838                } else {
2839                    tools.insert(tool_name, tool.clone());
2840                }
2841            } else {
2842                tools.insert(tool_name, tool.clone());
2843            }
2844        }
2845
2846        tools
2847    }
2848
2849    fn refresh_turn_tools(&mut self, cx: &App) {
2850        let tools = self.enabled_tools(cx);
2851        if let Some(turn) = self.running_turn.as_mut() {
2852            turn.tools = tools;
2853        }
2854    }
2855
2856    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
2857        self.running_turn.as_ref()?.tools.get(name).cloned()
2858    }
2859
2860    pub fn has_tool(&self, name: &str) -> bool {
2861        self.running_turn
2862            .as_ref()
2863            .is_some_and(|turn| turn.tools.contains_key(name))
2864    }
2865
2866    #[cfg(any(test, feature = "test-support"))]
2867    pub fn has_registered_tool(&self, name: &str) -> bool {
2868        self.tools.contains_key(name)
2869    }
2870
2871    pub fn registered_tool_names(&self) -> Vec<SharedString> {
2872        self.tools.keys().cloned().collect()
2873    }
2874
2875    pub(crate) fn register_running_subagent(&mut self, subagent: WeakEntity<Thread>) {
2876        self.running_subagents.push(subagent);
2877    }
2878
2879    pub(crate) fn unregister_running_subagent(
2880        &mut self,
2881        subagent_session_id: &acp::SessionId,
2882        cx: &App,
2883    ) {
2884        self.running_subagents.retain(|s| {
2885            s.upgrade()
2886                .map_or(false, |s| s.read(cx).id() != subagent_session_id)
2887        });
2888    }
2889
2890    #[cfg(any(test, feature = "test-support"))]
2891    pub fn running_subagent_ids(&self, cx: &App) -> Vec<acp::SessionId> {
2892        self.running_subagents
2893            .iter()
2894            .filter_map(|s| s.upgrade().map(|s| s.read(cx).id().clone()))
2895            .collect()
2896    }
2897
2898    pub fn is_subagent(&self) -> bool {
2899        self.subagent_context.is_some()
2900    }
2901
2902    pub fn parent_thread_id(&self) -> Option<acp::SessionId> {
2903        self.subagent_context
2904            .as_ref()
2905            .map(|c| c.parent_thread_id.clone())
2906    }
2907
2908    pub fn depth(&self) -> u8 {
2909        self.subagent_context.as_ref().map(|c| c.depth).unwrap_or(0)
2910    }
2911
2912    #[cfg(any(test, feature = "test-support"))]
2913    pub fn set_subagent_context(&mut self, context: SubagentContext) {
2914        self.subagent_context = Some(context);
2915    }
2916
2917    pub fn is_turn_complete(&self) -> bool {
2918        self.running_turn.is_none()
2919    }
2920
2921    fn build_request_messages(
2922        &self,
2923        available_tools: Vec<SharedString>,
2924        cx: &App,
2925    ) -> Vec<LanguageModelRequestMessage> {
2926        log::trace!(
2927            "Building request messages from {} thread messages",
2928            self.messages.len()
2929        );
2930
2931        let system_prompt = SystemPromptTemplate {
2932            project: self.project_context.read(cx),
2933            available_tools,
2934            available_skills: self.available_skills.read(cx).formatted().to_string(),
2935            model_name: self.model.as_ref().map(|m| m.name().0.to_string()),
2936        }
2937        .render(&self.templates)
2938        .context("failed to build system prompt")
2939        .expect("Invalid template");
2940        let mut messages = vec![LanguageModelRequestMessage {
2941            role: Role::System,
2942            content: vec![system_prompt.into()],
2943            cache: false,
2944            reasoning_details: None,
2945        }];
2946        for message in &self.messages {
2947            messages.extend(message.to_request());
2948        }
2949
2950        if let Some(last_message) = messages.last_mut() {
2951            last_message.cache = true;
2952        }
2953
2954        if let Some(message) = self.pending_message.as_ref() {
2955            messages.extend(message.to_request());
2956        }
2957
2958        messages
2959    }
2960
2961    pub fn to_markdown(&self) -> String {
2962        let mut markdown = String::new();
2963        for (ix, message) in self.messages.iter().enumerate() {
2964            if ix > 0 {
2965                markdown.push('\n');
2966            }
2967            match message {
2968                Message::User(_) => markdown.push_str("## User\n\n"),
2969                Message::Agent(_) => markdown.push_str("## Assistant\n\n"),
2970                Message::Resume => {}
2971            }
2972            markdown.push_str(&message.to_markdown());
2973        }
2974
2975        if let Some(message) = self.pending_message.as_ref() {
2976            markdown.push_str("\n## Assistant\n\n");
2977            markdown.push_str(&message.to_markdown());
2978        }
2979
2980        markdown
2981    }
2982
2983    fn advance_prompt_id(&mut self) {
2984        self.prompt_id = PromptId::new();
2985    }
2986
2987    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
2988        use LanguageModelCompletionError::*;
2989        use http_client::StatusCode;
2990
2991        // General strategy here:
2992        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
2993        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
2994        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
2995        match error {
2996            HttpResponseError {
2997                status_code: StatusCode::TOO_MANY_REQUESTS,
2998                ..
2999            } => Some(RetryStrategy::ExponentialBackoff {
3000                initial_delay: BASE_RETRY_DELAY,
3001                max_attempts: MAX_RETRY_ATTEMPTS,
3002            }),
3003            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
3004                Some(RetryStrategy::Fixed {
3005                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
3006                    max_attempts: MAX_RETRY_ATTEMPTS,
3007                })
3008            }
3009            UpstreamProviderError {
3010                status,
3011                retry_after,
3012                ..
3013            } => match *status {
3014                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
3015                    Some(RetryStrategy::Fixed {
3016                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
3017                        max_attempts: MAX_RETRY_ATTEMPTS,
3018                    })
3019                }
3020                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
3021                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
3022                    // Internal Server Error could be anything, retry up to 3 times.
3023                    max_attempts: 3,
3024                }),
3025                status => {
3026                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
3027                    // but we frequently get them in practice. See https://http.dev/529
3028                    if status.as_u16() == 529 {
3029                        Some(RetryStrategy::Fixed {
3030                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
3031                            max_attempts: MAX_RETRY_ATTEMPTS,
3032                        })
3033                    } else {
3034                        Some(RetryStrategy::Fixed {
3035                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
3036                            max_attempts: 2,
3037                        })
3038                    }
3039                }
3040            },
3041            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
3042                delay: BASE_RETRY_DELAY,
3043                max_attempts: 3,
3044            }),
3045            ApiReadResponseError { .. }
3046            | HttpSend { .. }
3047            | DeserializeResponse { .. }
3048            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
3049                delay: BASE_RETRY_DELAY,
3050                max_attempts: 3,
3051            }),
3052            // Retrying these errors definitely shouldn't help.
3053            HttpResponseError {
3054                status_code:
3055                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
3056                ..
3057            }
3058            | AuthenticationError { .. }
3059            | PermissionError { .. }
3060            | NoApiKey { .. }
3061            | ApiEndpointNotFound { .. }
3062            | PromptTooLarge { .. } => None,
3063            // These errors might be transient, so retry them
3064            SerializeRequest { .. } | BuildRequestBody { .. } | StreamEndedUnexpectedly { .. } => {
3065                Some(RetryStrategy::Fixed {
3066                    delay: BASE_RETRY_DELAY,
3067                    max_attempts: 1,
3068                })
3069            }
3070            // Retry all other 4xx and 5xx errors once.
3071            HttpResponseError { status_code, .. }
3072                if status_code.is_client_error() || status_code.is_server_error() =>
3073            {
3074                Some(RetryStrategy::Fixed {
3075                    delay: BASE_RETRY_DELAY,
3076                    max_attempts: 3,
3077                })
3078            }
3079            Other(err) if err.is::<language_model::PaymentRequiredError>() => {
3080                // Retrying won't help for Payment Required errors.
3081                None
3082            }
3083            // Conservatively assume that any other errors are non-retryable
3084            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
3085                delay: BASE_RETRY_DELAY,
3086                max_attempts: 2,
3087            }),
3088        }
3089    }
3090}
3091
3092struct RunningTurn {
3093    /// Holds the task that handles agent interaction until the end of the turn.
3094    /// Survives across multiple requests as the model performs tool calls and
3095    /// we run tools, report their results.
3096    _task: Task<()>,
3097    /// The current event stream for the running turn. Used to report a final
3098    /// cancellation event if we cancel the turn.
3099    event_stream: ThreadEventStream,
3100    /// The tools that are enabled for the current iteration of the turn.
3101    /// Refreshed at the start of each iteration via `refresh_turn_tools`.
3102    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
3103    /// Sender to signal tool cancellation. When cancel is called, this is
3104    /// set to true so all tools can detect user-initiated cancellation.
3105    cancellation_tx: watch::Sender<bool>,
3106    /// Senders for tools that support input streaming and have already been
3107    /// started but are still receiving input from the LLM.
3108    streaming_tool_inputs: HashMap<LanguageModelToolUseId, ToolInputSender>,
3109}
3110
3111impl RunningTurn {
3112    fn cancel(mut self) -> Task<()> {
3113        log::debug!("Cancelling in progress turn");
3114        self.cancellation_tx.send(true).ok();
3115        self.event_stream.send_canceled();
3116        self._task
3117    }
3118}
3119
3120pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
3121
3122impl EventEmitter<TokenUsageUpdated> for Thread {}
3123
3124pub struct TitleUpdated;
3125
3126impl EventEmitter<TitleUpdated> for Thread {}
3127
3128/// A channel-based wrapper that delivers tool input to a running tool.
3129///
3130/// For non-streaming tools, created via `ToolInput::ready()` so `.recv()` resolves immediately.
3131/// For streaming tools, partial JSON snapshots arrive via `.recv_partial()` as the LLM streams
3132/// them, followed by the final complete input available through `.recv()`.
3133pub struct ToolInput<T> {
3134    partial_rx: mpsc::UnboundedReceiver<serde_json::Value>,
3135    final_rx: oneshot::Receiver<serde_json::Value>,
3136    _phantom: PhantomData<T>,
3137}
3138
3139impl<T: DeserializeOwned> ToolInput<T> {
3140    #[cfg(any(test, feature = "test-support"))]
3141    pub fn resolved(input: impl Serialize) -> Self {
3142        let value = serde_json::to_value(input).expect("failed to serialize tool input");
3143        Self::ready(value)
3144    }
3145
3146    pub fn ready(value: serde_json::Value) -> Self {
3147        let (partial_tx, partial_rx) = mpsc::unbounded();
3148        drop(partial_tx);
3149        let (final_tx, final_rx) = oneshot::channel();
3150        final_tx.send(value).ok();
3151        Self {
3152            partial_rx,
3153            final_rx,
3154            _phantom: PhantomData,
3155        }
3156    }
3157
3158    #[cfg(any(test, feature = "test-support"))]
3159    pub fn test() -> (ToolInputSender, Self) {
3160        let (sender, input) = ToolInputSender::channel();
3161        (sender, input.cast())
3162    }
3163
3164    /// Wait for the final deserialized input, ignoring all partial updates.
3165    /// Non-streaming tools can use this to wait until the whole input is available.
3166    pub async fn recv(mut self) -> Result<T> {
3167        // Drain any remaining partials
3168        while self.partial_rx.next().await.is_some() {}
3169        let value = self
3170            .final_rx
3171            .await
3172            .map_err(|_| anyhow!("tool input was not fully received"))?;
3173        serde_json::from_value(value).map_err(Into::into)
3174    }
3175
3176    /// Returns the next partial JSON snapshot, or `None` when input is complete.
3177    /// Once this returns `None`, call `recv()` to get the final input.
3178    pub async fn recv_partial(&mut self) -> Option<serde_json::Value> {
3179        self.partial_rx.next().await
3180    }
3181
3182    fn cast<U: DeserializeOwned>(self) -> ToolInput<U> {
3183        ToolInput {
3184            partial_rx: self.partial_rx,
3185            final_rx: self.final_rx,
3186            _phantom: PhantomData,
3187        }
3188    }
3189}
3190
3191pub struct ToolInputSender {
3192    partial_tx: mpsc::UnboundedSender<serde_json::Value>,
3193    final_tx: Option<oneshot::Sender<serde_json::Value>>,
3194}
3195
3196impl ToolInputSender {
3197    pub(crate) fn channel() -> (Self, ToolInput<serde_json::Value>) {
3198        let (partial_tx, partial_rx) = mpsc::unbounded();
3199        let (final_tx, final_rx) = oneshot::channel();
3200        let sender = Self {
3201            partial_tx,
3202            final_tx: Some(final_tx),
3203        };
3204        let input = ToolInput {
3205            partial_rx,
3206            final_rx,
3207            _phantom: PhantomData,
3208        };
3209        (sender, input)
3210    }
3211
3212    pub(crate) fn has_received_final(&self) -> bool {
3213        self.final_tx.is_none()
3214    }
3215
3216    pub(crate) fn send_partial(&self, value: serde_json::Value) {
3217        self.partial_tx.unbounded_send(value).ok();
3218    }
3219
3220    pub(crate) fn send_final(mut self, value: serde_json::Value) {
3221        // Close the partial channel so recv_partial() returns None
3222        self.partial_tx.close_channel();
3223        if let Some(final_tx) = self.final_tx.take() {
3224            final_tx.send(value).ok();
3225        }
3226    }
3227}
3228
3229pub trait AgentTool
3230where
3231    Self: 'static + Sized,
3232{
3233    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
3234    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
3235
3236    const NAME: &'static str;
3237
3238    fn description() -> SharedString {
3239        let schema = schemars::schema_for!(Self::Input);
3240        SharedString::new(
3241            schema
3242                .get("description")
3243                .and_then(|description| description.as_str())
3244                .unwrap_or_default(),
3245        )
3246    }
3247
3248    fn kind() -> acp::ToolKind;
3249
3250    /// The initial tool title to display. Can be updated during the tool run.
3251    fn initial_title(
3252        &self,
3253        input: Result<Self::Input, serde_json::Value>,
3254        cx: &mut App,
3255    ) -> SharedString;
3256
3257    /// Returns the JSON schema that describes the tool's input.
3258    fn input_schema(format: LanguageModelToolSchemaFormat) -> Schema {
3259        language_model::tool_schema::root_schema_for::<Self::Input>(format)
3260    }
3261
3262    /// Returns whether the tool supports streaming of tool use parameters.
3263    fn supports_input_streaming() -> bool {
3264        false
3265    }
3266
3267    /// Some tools rely on a provider for the underlying billing or other reasons.
3268    /// Allow the tool to check if they are compatible, or should be filtered out.
3269    fn supports_provider(_provider: &LanguageModelProviderId) -> bool {
3270        true
3271    }
3272
3273    /// Runs the tool with the provided input.
3274    ///
3275    /// Returns `Result<Self::Output, Self::Output>` rather than `Result<Self::Output, anyhow::Error>`
3276    /// because tool errors are sent back to the model as tool results. This means error output must
3277    /// be structured and readable by the agent — not an arbitrary `anyhow::Error`. Returning the
3278    /// same `Output` type for both success and failure lets tools provide structured data while
3279    /// still signaling whether the invocation succeeded or failed.
3280    fn run(
3281        self: Arc<Self>,
3282        input: ToolInput<Self::Input>,
3283        event_stream: ToolCallEventStream,
3284        cx: &mut App,
3285    ) -> Task<Result<Self::Output, Self::Output>>;
3286
3287    /// Emits events for a previous execution of the tool.
3288    fn replay(
3289        &self,
3290        _input: Self::Input,
3291        _output: Self::Output,
3292        _event_stream: ToolCallEventStream,
3293        _cx: &mut App,
3294    ) -> Result<()> {
3295        Ok(())
3296    }
3297
3298    fn erase(self) -> Arc<dyn AnyAgentTool> {
3299        Arc::new(Erased(Arc::new(self)))
3300    }
3301}
3302
3303pub struct Erased<T>(T);
3304
3305pub struct AgentToolOutput {
3306    pub llm_output: LanguageModelToolResultContent,
3307    pub raw_output: serde_json::Value,
3308}
3309
3310impl AgentToolOutput {
3311    pub fn from_error(message: impl Into<String>) -> Self {
3312        let message = message.into();
3313        let llm_output = LanguageModelToolResultContent::Text(Arc::from(message.as_str()));
3314        Self {
3315            raw_output: serde_json::Value::String(message),
3316            llm_output,
3317        }
3318    }
3319}
3320
3321pub trait AnyAgentTool {
3322    fn name(&self) -> SharedString;
3323    fn description(&self) -> SharedString;
3324    fn kind(&self) -> acp::ToolKind;
3325    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
3326    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
3327    fn supports_input_streaming(&self) -> bool {
3328        false
3329    }
3330    fn supports_provider(&self, _provider: &LanguageModelProviderId) -> bool {
3331        true
3332    }
3333    /// See [`AgentTool::run`] for why this returns `Result<AgentToolOutput, AgentToolOutput>`.
3334    fn run(
3335        self: Arc<Self>,
3336        input: ToolInput<serde_json::Value>,
3337        event_stream: ToolCallEventStream,
3338        cx: &mut App,
3339    ) -> Task<Result<AgentToolOutput, AgentToolOutput>>;
3340    fn replay(
3341        &self,
3342        input: serde_json::Value,
3343        output: serde_json::Value,
3344        event_stream: ToolCallEventStream,
3345        cx: &mut App,
3346    ) -> Result<()>;
3347}
3348
3349impl<T> AnyAgentTool for Erased<Arc<T>>
3350where
3351    T: AgentTool,
3352{
3353    fn name(&self) -> SharedString {
3354        T::NAME.into()
3355    }
3356
3357    fn description(&self) -> SharedString {
3358        T::description()
3359    }
3360
3361    fn kind(&self) -> agent_client_protocol::ToolKind {
3362        T::kind()
3363    }
3364
3365    fn supports_input_streaming(&self) -> bool {
3366        T::supports_input_streaming()
3367    }
3368
3369    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
3370        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
3371        self.0.initial_title(parsed_input, _cx)
3372    }
3373
3374    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
3375        let mut json = serde_json::to_value(T::input_schema(format))?;
3376        language_model::tool_schema::adapt_schema_to_format(&mut json, format)?;
3377        Ok(json)
3378    }
3379
3380    fn supports_provider(&self, provider: &LanguageModelProviderId) -> bool {
3381        T::supports_provider(provider)
3382    }
3383
3384    fn run(
3385        self: Arc<Self>,
3386        input: ToolInput<serde_json::Value>,
3387        event_stream: ToolCallEventStream,
3388        cx: &mut App,
3389    ) -> Task<Result<AgentToolOutput, AgentToolOutput>> {
3390        let tool_input: ToolInput<T::Input> = input.cast();
3391        let task = self.0.clone().run(tool_input, event_stream, cx);
3392        cx.spawn(async move |_cx| match task.await {
3393            Ok(output) => {
3394                let raw_output = serde_json::to_value(&output).map_err(|e| {
3395                    AgentToolOutput::from_error(format!("Failed to serialize tool output: {e}"))
3396                })?;
3397                Ok(AgentToolOutput {
3398                    llm_output: output.into(),
3399                    raw_output,
3400                })
3401            }
3402            Err(error_output) => {
3403                let raw_output = serde_json::to_value(&error_output).unwrap_or_else(|e| {
3404                    log::error!("Failed to serialize tool error output: {e}");
3405                    serde_json::Value::Null
3406                });
3407                Err(AgentToolOutput {
3408                    llm_output: error_output.into(),
3409                    raw_output,
3410                })
3411            }
3412        })
3413    }
3414
3415    fn replay(
3416        &self,
3417        input: serde_json::Value,
3418        output: serde_json::Value,
3419        event_stream: ToolCallEventStream,
3420        cx: &mut App,
3421    ) -> Result<()> {
3422        let input = serde_json::from_value(input)?;
3423        let output = serde_json::from_value(output)?;
3424        self.0.replay(input, output, event_stream, cx)
3425    }
3426}
3427
3428#[derive(Clone)]
3429struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
3430
3431impl ThreadEventStream {
3432    fn send_user_message(&self, message: &UserMessage) {
3433        self.0
3434            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
3435            .ok();
3436    }
3437
3438    fn send_text(&self, text: &str) {
3439        self.0
3440            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
3441            .ok();
3442    }
3443
3444    fn send_thinking(&self, text: &str) {
3445        self.0
3446            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
3447            .ok();
3448    }
3449
3450    fn send_tool_call(
3451        &self,
3452        id: &LanguageModelToolUseId,
3453        tool_name: &str,
3454        title: SharedString,
3455        kind: acp::ToolKind,
3456        input: serde_json::Value,
3457    ) {
3458        self.0
3459            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
3460                id,
3461                tool_name,
3462                title.to_string(),
3463                kind,
3464                input,
3465            ))))
3466            .ok();
3467    }
3468
3469    fn initial_tool_call(
3470        id: &LanguageModelToolUseId,
3471        tool_name: &str,
3472        title: String,
3473        kind: acp::ToolKind,
3474        input: serde_json::Value,
3475    ) -> acp::ToolCall {
3476        acp::ToolCall::new(id.to_string(), title)
3477            .kind(kind)
3478            .raw_input(input)
3479            .meta(acp_thread::meta_with_tool_name(tool_name))
3480    }
3481
3482    fn update_tool_call_fields(
3483        &self,
3484        tool_use_id: &LanguageModelToolUseId,
3485        fields: acp::ToolCallUpdateFields,
3486        meta: Option<acp::Meta>,
3487    ) {
3488        self.0
3489            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
3490                acp::ToolCallUpdate::new(tool_use_id.to_string(), fields)
3491                    .meta(meta)
3492                    .into(),
3493            )))
3494            .ok();
3495    }
3496
3497    fn send_plan(&self, plan: acp::Plan) {
3498        self.0.unbounded_send(Ok(ThreadEvent::Plan(plan))).ok();
3499    }
3500
3501    fn send_retry(&self, status: acp_thread::RetryStatus) {
3502        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
3503    }
3504
3505    fn send_stop(&self, reason: acp::StopReason) {
3506        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
3507    }
3508
3509    fn send_canceled(&self) {
3510        self.0
3511            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
3512            .ok();
3513    }
3514
3515    fn send_error(&self, error: impl Into<anyhow::Error>) {
3516        self.0.unbounded_send(Err(error.into())).ok();
3517    }
3518}
3519
3520#[derive(Clone)]
3521pub struct ToolCallEventStream {
3522    tool_use_id: LanguageModelToolUseId,
3523    stream: ThreadEventStream,
3524    fs: Option<Arc<dyn Fs>>,
3525    cancellation_rx: watch::Receiver<bool>,
3526}
3527
3528impl ToolCallEventStream {
3529    #[cfg(any(test, feature = "test-support"))]
3530    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
3531        let (stream, receiver, _cancellation_tx) = Self::test_with_cancellation();
3532        (stream, receiver)
3533    }
3534
3535    #[cfg(any(test, feature = "test-support"))]
3536    pub fn test_with_cancellation() -> (Self, ToolCallEventStreamReceiver, watch::Sender<bool>) {
3537        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
3538        let (cancellation_tx, cancellation_rx) = watch::channel(false);
3539
3540        let stream = ToolCallEventStream::new(
3541            "test_id".into(),
3542            ThreadEventStream(events_tx),
3543            None,
3544            cancellation_rx,
3545        );
3546
3547        (
3548            stream,
3549            ToolCallEventStreamReceiver(events_rx),
3550            cancellation_tx,
3551        )
3552    }
3553
3554    /// Signal cancellation for this event stream. Only available in tests.
3555    #[cfg(any(test, feature = "test-support"))]
3556    pub fn signal_cancellation_with_sender(cancellation_tx: &mut watch::Sender<bool>) {
3557        cancellation_tx.send(true).ok();
3558    }
3559
3560    fn new(
3561        tool_use_id: LanguageModelToolUseId,
3562        stream: ThreadEventStream,
3563        fs: Option<Arc<dyn Fs>>,
3564        cancellation_rx: watch::Receiver<bool>,
3565    ) -> Self {
3566        Self {
3567            tool_use_id,
3568            stream,
3569            fs,
3570            cancellation_rx,
3571        }
3572    }
3573
3574    /// Returns a future that resolves when the user cancels the tool call.
3575    /// Tools should select on this alongside their main work to detect user cancellation.
3576    pub fn cancelled_by_user(&self) -> impl std::future::Future<Output = ()> + '_ {
3577        let mut rx = self.cancellation_rx.clone();
3578        async move {
3579            loop {
3580                if *rx.borrow() {
3581                    return;
3582                }
3583                if rx.changed().await.is_err() {
3584                    // Sender dropped, will never be cancelled
3585                    std::future::pending::<()>().await;
3586                }
3587            }
3588        }
3589    }
3590
3591    /// Returns true if the user has cancelled this tool call.
3592    /// This is useful for checking cancellation state after an operation completes,
3593    /// to determine if the completion was due to user cancellation.
3594    pub fn was_cancelled_by_user(&self) -> bool {
3595        *self.cancellation_rx.clone().borrow()
3596    }
3597
3598    pub fn tool_use_id(&self) -> &LanguageModelToolUseId {
3599        &self.tool_use_id
3600    }
3601
3602    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
3603        self.stream
3604            .update_tool_call_fields(&self.tool_use_id, fields, None);
3605    }
3606
3607    pub fn update_fields_with_meta(
3608        &self,
3609        fields: acp::ToolCallUpdateFields,
3610        meta: Option<acp::Meta>,
3611    ) {
3612        self.stream
3613            .update_tool_call_fields(&self.tool_use_id, fields, meta);
3614    }
3615
3616    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
3617        self.stream
3618            .0
3619            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
3620                acp_thread::ToolCallUpdateDiff {
3621                    id: acp::ToolCallId::new(self.tool_use_id.to_string()),
3622                    diff,
3623                }
3624                .into(),
3625            )))
3626            .ok();
3627    }
3628
3629    pub fn subagent_spawned(&self, id: acp::SessionId) {
3630        self.stream
3631            .0
3632            .unbounded_send(Ok(ThreadEvent::SubagentSpawned(id)))
3633            .ok();
3634    }
3635
3636    pub fn update_plan(&self, plan: acp::Plan) {
3637        self.stream.send_plan(plan);
3638    }
3639
3640    /// Authorize a third-party tool (e.g., MCP tool from a context server).
3641    ///
3642    /// Unlike built-in tools, third-party tools don't support pattern-based permissions.
3643    /// They only support `default` (allow/deny/confirm) per tool.
3644    ///
3645    /// Uses the dropdown authorization flow with two granularities:
3646    /// - "Always for <display_name> MCP tool" → sets `tools.<tool_id>.default = "allow"` or "deny"
3647    /// - "Only this time" → allow/deny once
3648    pub fn authorize_third_party_tool(
3649        &self,
3650        title: impl Into<String>,
3651        tool_id: String,
3652        display_name: String,
3653        cx: &mut App,
3654    ) -> Task<Result<()>> {
3655        let settings = agent_settings::AgentSettings::get_global(cx);
3656
3657        let decision = decide_permission_from_settings(&tool_id, &[String::new()], &settings);
3658
3659        match decision {
3660            ToolPermissionDecision::Allow => return Task::ready(Ok(())),
3661            ToolPermissionDecision::Deny(reason) => return Task::ready(Err(anyhow!(reason))),
3662            ToolPermissionDecision::Confirm => {}
3663        }
3664
3665        let (response_tx, response_rx) = oneshot::channel();
3666        if let Err(error) = self
3667            .stream
3668            .0
3669            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
3670                ToolCallAuthorization {
3671                    tool_call: acp::ToolCallUpdate::new(
3672                        self.tool_use_id.to_string(),
3673                        acp::ToolCallUpdateFields::new().title(title.into()),
3674                    ),
3675                    options: acp_thread::PermissionOptions::Dropdown(vec![
3676                        acp_thread::PermissionOptionChoice {
3677                            allow: acp::PermissionOption::new(
3678                                acp::PermissionOptionId::new(format!(
3679                                    "always_allow_mcp:{}",
3680                                    tool_id
3681                                )),
3682                                format!("Always for {} MCP tool", display_name),
3683                                acp::PermissionOptionKind::AllowAlways,
3684                            ),
3685                            deny: acp::PermissionOption::new(
3686                                acp::PermissionOptionId::new(format!(
3687                                    "always_deny_mcp:{}",
3688                                    tool_id
3689                                )),
3690                                format!("Always for {} MCP tool", display_name),
3691                                acp::PermissionOptionKind::RejectAlways,
3692                            ),
3693                            sub_patterns: vec![],
3694                        },
3695                        acp_thread::PermissionOptionChoice {
3696                            allow: acp::PermissionOption::new(
3697                                acp::PermissionOptionId::new("allow"),
3698                                "Only this time",
3699                                acp::PermissionOptionKind::AllowOnce,
3700                            ),
3701                            deny: acp::PermissionOption::new(
3702                                acp::PermissionOptionId::new("deny"),
3703                                "Only this time",
3704                                acp::PermissionOptionKind::RejectOnce,
3705                            ),
3706                            sub_patterns: vec![],
3707                        },
3708                    ]),
3709                    response: response_tx,
3710                    context: None,
3711                },
3712            )))
3713        {
3714            log::error!("Failed to send tool call authorization: {error}");
3715            return Task::ready(Err(anyhow!(
3716                "Failed to send tool call authorization: {error}"
3717            )));
3718        }
3719
3720        let fs = self.fs.clone();
3721        cx.spawn(async move |cx| {
3722            let outcome = response_rx.await?;
3723            let is_allow = Self::persist_permission_outcome(&outcome, fs, &cx);
3724            if is_allow {
3725                Ok(())
3726            } else {
3727                Err(anyhow!("Permission to run tool denied by user"))
3728            }
3729        })
3730    }
3731
3732    pub fn authorize(
3733        &self,
3734        title: impl Into<String>,
3735        context: ToolPermissionContext,
3736        cx: &mut App,
3737    ) -> Task<Result<()>> {
3738        let options = context.build_permission_options();
3739
3740        let (response_tx, response_rx) = oneshot::channel();
3741        if let Err(error) = self
3742            .stream
3743            .0
3744            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
3745                ToolCallAuthorization {
3746                    tool_call: acp::ToolCallUpdate::new(
3747                        self.tool_use_id.to_string(),
3748                        acp::ToolCallUpdateFields::new().title(title.into()),
3749                    ),
3750                    options,
3751                    response: response_tx,
3752                    context: Some(context),
3753                },
3754            )))
3755        {
3756            log::error!("Failed to send tool call authorization: {error}");
3757            return Task::ready(Err(anyhow!(
3758                "Failed to send tool call authorization: {error}"
3759            )));
3760        }
3761
3762        let fs = self.fs.clone();
3763        cx.spawn(async move |cx| {
3764            let outcome = response_rx.await?;
3765            let is_allow = Self::persist_permission_outcome(&outcome, fs, &cx);
3766            if is_allow {
3767                Ok(())
3768            } else {
3769                Err(anyhow!("Permission to run tool denied by user"))
3770            }
3771        })
3772    }
3773
3774    /// Interprets a `SelectedPermissionOutcome` and persists any settings changes.
3775    /// Returns `true` if the tool call should be allowed, `false` if denied.
3776    fn persist_permission_outcome(
3777        outcome: &acp_thread::SelectedPermissionOutcome,
3778        fs: Option<Arc<dyn Fs>>,
3779        cx: &AsyncApp,
3780    ) -> bool {
3781        let option_id = outcome.option_id.0.as_ref();
3782
3783        let always_permission = option_id
3784            .strip_prefix("always_allow:")
3785            .map(|tool| (tool, ToolPermissionMode::Allow))
3786            .or_else(|| {
3787                option_id
3788                    .strip_prefix("always_deny:")
3789                    .map(|tool| (tool, ToolPermissionMode::Deny))
3790            })
3791            .or_else(|| {
3792                option_id
3793                    .strip_prefix("always_allow_mcp:")
3794                    .map(|tool| (tool, ToolPermissionMode::Allow))
3795            })
3796            .or_else(|| {
3797                option_id
3798                    .strip_prefix("always_deny_mcp:")
3799                    .map(|tool| (tool, ToolPermissionMode::Deny))
3800            });
3801
3802        if let Some((tool, mode)) = always_permission {
3803            let params = outcome.params.as_ref();
3804            Self::persist_always_permission(tool, mode, params, fs, cx);
3805            return mode == ToolPermissionMode::Allow;
3806        }
3807
3808        // Handle simple "allow" / "deny" (once, no persistence)
3809        if option_id == "allow" || option_id == "deny" {
3810            debug_assert!(
3811                outcome.params.is_none(),
3812                "unexpected params for once-only permission"
3813            );
3814            return option_id == "allow";
3815        }
3816
3817        debug_assert!(false, "unexpected permission option_id: {option_id}");
3818        false
3819    }
3820
3821    /// Persists an "always allow" or "always deny" permission, using sub_patterns
3822    /// from params when present.
3823    fn persist_always_permission(
3824        tool: &str,
3825        mode: ToolPermissionMode,
3826        params: Option<&acp_thread::SelectedPermissionParams>,
3827        fs: Option<Arc<dyn Fs>>,
3828        cx: &AsyncApp,
3829    ) {
3830        let Some(fs) = fs else {
3831            return;
3832        };
3833
3834        match params {
3835            Some(acp_thread::SelectedPermissionParams::Terminal {
3836                patterns: sub_patterns,
3837            }) => {
3838                debug_assert!(
3839                    !sub_patterns.is_empty(),
3840                    "empty sub_patterns for tool {tool} — callers should pass None instead"
3841                );
3842                let tool = tool.to_string();
3843                let sub_patterns = sub_patterns.clone();
3844                cx.update(|cx| {
3845                    update_settings_file(fs, cx, move |settings, _| {
3846                        let agent = settings.agent.get_or_insert_default();
3847                        for pattern in sub_patterns {
3848                            match mode {
3849                                ToolPermissionMode::Allow => {
3850                                    agent.add_tool_allow_pattern(&tool, pattern);
3851                                }
3852                                ToolPermissionMode::Deny => {
3853                                    agent.add_tool_deny_pattern(&tool, pattern);
3854                                }
3855                                // If there's no matching pattern this will
3856                                // default to confirm, so falling through is
3857                                // fine here.
3858                                ToolPermissionMode::Confirm => (),
3859                            }
3860                        }
3861                    });
3862                });
3863            }
3864            None => {
3865                let tool = tool.to_string();
3866                cx.update(|cx| {
3867                    update_settings_file(fs, cx, move |settings, _| {
3868                        settings
3869                            .agent
3870                            .get_or_insert_default()
3871                            .set_tool_default_permission(&tool, mode);
3872                    });
3873                });
3874            }
3875        }
3876    }
3877}
3878
3879#[cfg(any(test, feature = "test-support"))]
3880pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
3881
3882#[cfg(any(test, feature = "test-support"))]
3883impl ToolCallEventStreamReceiver {
3884    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
3885        let event = self.0.next().await;
3886        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
3887            auth
3888        } else {
3889            panic!("Expected ToolCallAuthorization but got: {:?}", event);
3890        }
3891    }
3892
3893    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
3894        let event = self.0.next().await;
3895        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
3896            update,
3897        )))) = event
3898        {
3899            update.fields
3900        } else {
3901            panic!("Expected update fields but got: {:?}", event);
3902        }
3903    }
3904
3905    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
3906        let event = self.0.next().await;
3907        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
3908            update,
3909        )))) = event
3910        {
3911            update.diff
3912        } else {
3913            panic!("Expected diff but got: {:?}", event);
3914        }
3915    }
3916
3917    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
3918        let event = self.0.next().await;
3919        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
3920            update,
3921        )))) = event
3922        {
3923            update.terminal
3924        } else {
3925            panic!("Expected terminal but got: {:?}", event);
3926        }
3927    }
3928
3929    pub async fn expect_plan(&mut self) -> acp::Plan {
3930        let event = self.0.next().await;
3931        if let Some(Ok(ThreadEvent::Plan(plan))) = event {
3932            plan
3933        } else {
3934            panic!("Expected plan but got: {:?}", event);
3935        }
3936    }
3937}
3938
3939#[cfg(any(test, feature = "test-support"))]
3940impl std::ops::Deref for ToolCallEventStreamReceiver {
3941    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
3942
3943    fn deref(&self) -> &Self::Target {
3944        &self.0
3945    }
3946}
3947
3948#[cfg(any(test, feature = "test-support"))]
3949impl std::ops::DerefMut for ToolCallEventStreamReceiver {
3950    fn deref_mut(&mut self) -> &mut Self::Target {
3951        &mut self.0
3952    }
3953}
3954
3955impl From<&str> for UserMessageContent {
3956    fn from(text: &str) -> Self {
3957        Self::Text(text.into())
3958    }
3959}
3960
3961impl From<String> for UserMessageContent {
3962    fn from(text: String) -> Self {
3963        Self::Text(text)
3964    }
3965}
3966
3967impl UserMessageContent {
3968    pub fn from_content_block(value: acp::ContentBlock, path_style: PathStyle) -> Self {
3969        match value {
3970            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
3971            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
3972            acp::ContentBlock::Audio(_) => {
3973                // TODO
3974                Self::Text("[audio]".to_string())
3975            }
3976            acp::ContentBlock::ResourceLink(resource_link) => {
3977                match MentionUri::parse(&resource_link.uri, path_style) {
3978                    Ok(uri) => Self::Mention {
3979                        uri,
3980                        content: String::new(),
3981                    },
3982                    Err(err) => {
3983                        log::error!("Failed to parse mention link: {}", err);
3984                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
3985                    }
3986                }
3987            }
3988            acp::ContentBlock::Resource(resource) => match resource.resource {
3989                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
3990                    match MentionUri::parse(&resource.uri, path_style) {
3991                        Ok(uri) => Self::Mention {
3992                            uri,
3993                            content: resource.text,
3994                        },
3995                        Err(err) => {
3996                            log::error!("Failed to parse mention link: {}", err);
3997                            Self::Text(
3998                                MarkdownCodeBlock {
3999                                    tag: &resource.uri,
4000                                    text: &resource.text,
4001                                }
4002                                .to_string(),
4003                            )
4004                        }
4005                    }
4006                }
4007                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
4008                    // TODO
4009                    Self::Text("[blob]".to_string())
4010                }
4011                other => {
4012                    log::warn!("Unexpected content type: {:?}", other);
4013                    Self::Text("[unknown]".to_string())
4014                }
4015            },
4016            other => {
4017                log::warn!("Unexpected content type: {:?}", other);
4018                Self::Text("[unknown]".to_string())
4019            }
4020        }
4021    }
4022}
4023
4024impl From<UserMessageContent> for acp::ContentBlock {
4025    fn from(content: UserMessageContent) -> Self {
4026        match content {
4027            UserMessageContent::Text(text) => text.into(),
4028            UserMessageContent::Image(image) => {
4029                acp::ContentBlock::Image(acp::ImageContent::new(image.source, "image/png"))
4030            }
4031            UserMessageContent::Mention { uri, content } => acp::ContentBlock::Resource(
4032                acp::EmbeddedResource::new(acp::EmbeddedResourceResource::TextResourceContents(
4033                    acp::TextResourceContents::new(content, uri.to_uri().to_string()),
4034                )),
4035            ),
4036        }
4037    }
4038}
4039
4040fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
4041    LanguageModelImage {
4042        source: image_content.data.into(),
4043        size: None,
4044    }
4045}
4046
4047#[cfg(test)]
4048mod tests {
4049    use super::*;
4050    use gpui::TestAppContext;
4051    use language_model::LanguageModelToolUseId;
4052    use language_model::fake_provider::FakeLanguageModel;
4053    use serde_json::json;
4054    use std::sync::Arc;
4055
4056    async fn setup_thread_for_test(cx: &mut TestAppContext) -> (Entity<Thread>, ThreadEventStream) {
4057        cx.update(|cx| {
4058            let settings_store = settings::SettingsStore::test(cx);
4059            cx.set_global(settings_store);
4060        });
4061
4062        let fs = fs::FakeFs::new(cx.background_executor.clone());
4063        let templates = Templates::new();
4064        let project = Project::test(fs.clone(), [], cx).await;
4065
4066        cx.update(|cx| {
4067            let project_context = cx.new(|_cx| prompt_store::ProjectContext::default());
4068            let context_server_store = project.read(cx).context_server_store();
4069            let context_server_registry =
4070                cx.new(|cx| ContextServerRegistry::new(context_server_store, cx));
4071
4072            let thread = cx.new(|cx| {
4073                Thread::new(
4074                    project,
4075                    project_context,
4076                    context_server_registry,
4077                    templates,
4078                    None,
4079                    cx,
4080                )
4081            });
4082
4083            let (event_tx, _event_rx) = mpsc::unbounded();
4084            let event_stream = ThreadEventStream(event_tx);
4085
4086            (thread, event_stream)
4087        })
4088    }
4089
4090    fn setup_parent_with_subagents(
4091        cx: &mut TestAppContext,
4092        parent: &Entity<Thread>,
4093        count: usize,
4094    ) -> Vec<Entity<Thread>> {
4095        cx.update(|cx| {
4096            let mut subagents = Vec::new();
4097            for _ in 0..count {
4098                let subagent = cx.new(|cx| Thread::new_subagent(parent, cx));
4099                parent.update(cx, |thread, _cx| {
4100                    thread.register_running_subagent(subagent.downgrade());
4101                });
4102                subagents.push(subagent);
4103            }
4104            subagents
4105        })
4106    }
4107
4108    #[gpui::test]
4109    async fn test_set_model_propagates_to_subagents(cx: &mut TestAppContext) {
4110        let (parent, _event_stream) = setup_thread_for_test(cx).await;
4111        let subagents = setup_parent_with_subagents(cx, &parent, 2);
4112
4113        let new_model: Arc<dyn LanguageModel> = Arc::new(FakeLanguageModel::with_id_and_thinking(
4114            "test-provider",
4115            "new-model",
4116            "New Model",
4117            false,
4118        ));
4119
4120        cx.update(|cx| {
4121            parent.update(cx, |thread, cx| {
4122                thread.set_model(new_model, cx);
4123            });
4124
4125            for subagent in &subagents {
4126                let subagent_model_id = subagent.read(cx).model().unwrap().id();
4127                assert_eq!(
4128                    subagent_model_id.0.as_ref(),
4129                    "new-model",
4130                    "Subagent model should match parent model after set_model"
4131                );
4132            }
4133        });
4134    }
4135
4136    #[gpui::test]
4137    async fn test_set_summarization_model_propagates_to_subagents(cx: &mut TestAppContext) {
4138        let (parent, _event_stream) = setup_thread_for_test(cx).await;
4139        let subagents = setup_parent_with_subagents(cx, &parent, 2);
4140
4141        let summary_model: Arc<dyn LanguageModel> =
4142            Arc::new(FakeLanguageModel::with_id_and_thinking(
4143                "test-provider",
4144                "summary-model",
4145                "Summary Model",
4146                false,
4147            ));
4148
4149        cx.update(|cx| {
4150            parent.update(cx, |thread, cx| {
4151                thread.set_summarization_model(Some(summary_model), cx);
4152            });
4153
4154            for subagent in &subagents {
4155                let subagent_summary_id = subagent.read(cx).summarization_model().unwrap().id();
4156                assert_eq!(
4157                    subagent_summary_id.0.as_ref(),
4158                    "summary-model",
4159                    "Subagent summarization model should match parent after set_summarization_model"
4160                );
4161            }
4162        });
4163    }
4164
4165    #[gpui::test]
4166    async fn test_set_thinking_enabled_propagates_to_subagents(cx: &mut TestAppContext) {
4167        let (parent, _event_stream) = setup_thread_for_test(cx).await;
4168        let subagents = setup_parent_with_subagents(cx, &parent, 2);
4169
4170        cx.update(|cx| {
4171            parent.update(cx, |thread, cx| {
4172                thread.set_thinking_enabled(true, cx);
4173            });
4174
4175            for subagent in &subagents {
4176                assert!(
4177                    subagent.read(cx).thinking_enabled(),
4178                    "Subagent thinking should be enabled after parent enables it"
4179                );
4180            }
4181
4182            parent.update(cx, |thread, cx| {
4183                thread.set_thinking_enabled(false, cx);
4184            });
4185
4186            for subagent in &subagents {
4187                assert!(
4188                    !subagent.read(cx).thinking_enabled(),
4189                    "Subagent thinking should be disabled after parent disables it"
4190                );
4191            }
4192        });
4193    }
4194
4195    #[gpui::test]
4196    async fn test_set_thinking_effort_propagates_to_subagents(cx: &mut TestAppContext) {
4197        let (parent, _event_stream) = setup_thread_for_test(cx).await;
4198        let subagents = setup_parent_with_subagents(cx, &parent, 2);
4199
4200        cx.update(|cx| {
4201            parent.update(cx, |thread, cx| {
4202                thread.set_thinking_effort(Some("high".to_string()), cx);
4203            });
4204
4205            for subagent in &subagents {
4206                assert_eq!(
4207                    subagent.read(cx).thinking_effort().map(|s| s.as_str()),
4208                    Some("high"),
4209                    "Subagent thinking effort should match parent"
4210                );
4211            }
4212
4213            parent.update(cx, |thread, cx| {
4214                thread.set_thinking_effort(None, cx);
4215            });
4216
4217            for subagent in &subagents {
4218                assert_eq!(
4219                    subagent.read(cx).thinking_effort(),
4220                    None,
4221                    "Subagent thinking effort should be None after parent clears it"
4222                );
4223            }
4224        });
4225    }
4226
4227    #[gpui::test]
4228    async fn test_set_speed_propagates_to_subagents(cx: &mut TestAppContext) {
4229        let (parent, _event_stream) = setup_thread_for_test(cx).await;
4230        let subagents = setup_parent_with_subagents(cx, &parent, 2);
4231
4232        cx.update(|cx| {
4233            parent.update(cx, |thread, cx| {
4234                thread.set_speed(Speed::Fast, cx);
4235            });
4236
4237            for subagent in &subagents {
4238                assert_eq!(
4239                    subagent.read(cx).speed(),
4240                    Some(Speed::Fast),
4241                    "Subagent speed should match parent after set_speed"
4242                );
4243            }
4244        });
4245    }
4246
4247    #[gpui::test]
4248    async fn test_dropped_subagent_does_not_panic(cx: &mut TestAppContext) {
4249        let (parent, _event_stream) = setup_thread_for_test(cx).await;
4250        let subagents = setup_parent_with_subagents(cx, &parent, 1);
4251
4252        // Drop the subagent so the WeakEntity can no longer be upgraded
4253        drop(subagents);
4254
4255        // Should not panic even though the subagent was dropped
4256        cx.update(|cx| {
4257            parent.update(cx, |thread, cx| {
4258                thread.set_thinking_enabled(true, cx);
4259                thread.set_speed(Speed::Fast, cx);
4260                thread.set_thinking_effort(Some("high".to_string()), cx);
4261            });
4262        });
4263    }
4264
4265    #[gpui::test]
4266    async fn test_handle_tool_use_json_parse_error_adds_tool_use_to_content(
4267        cx: &mut TestAppContext,
4268    ) {
4269        let (thread, event_stream) = setup_thread_for_test(cx).await;
4270
4271        cx.update(|cx| {
4272            thread.update(cx, |thread, _cx| {
4273                let tool_use_id = LanguageModelToolUseId::from("test_tool_id");
4274                let tool_name: Arc<str> = Arc::from("test_tool");
4275                let raw_input: Arc<str> = Arc::from("{invalid json");
4276                let json_parse_error = "expected value at line 1 column 1".to_string();
4277
4278                // Call the function under test
4279                let result = thread.handle_tool_use_json_parse_error_event(
4280                    tool_use_id.clone(),
4281                    tool_name.clone(),
4282                    raw_input.clone(),
4283                    json_parse_error,
4284                    &event_stream,
4285                );
4286
4287                // Verify the result is an error
4288                assert!(result.is_error);
4289                assert_eq!(result.tool_use_id, tool_use_id);
4290                assert_eq!(result.tool_name, tool_name);
4291                assert!(matches!(
4292                    result.content,
4293                    LanguageModelToolResultContent::Text(_)
4294                ));
4295
4296                // Verify the tool use was added to the message content
4297                {
4298                    let last_message = thread.pending_message();
4299                    assert_eq!(
4300                        last_message.content.len(),
4301                        1,
4302                        "Should have one tool_use in content"
4303                    );
4304
4305                    match &last_message.content[0] {
4306                        AgentMessageContent::ToolUse(tool_use) => {
4307                            assert_eq!(tool_use.id, tool_use_id);
4308                            assert_eq!(tool_use.name, tool_name);
4309                            assert_eq!(tool_use.raw_input, raw_input.to_string());
4310                            assert!(tool_use.is_input_complete);
4311                            // Should fall back to empty object for invalid JSON
4312                            assert_eq!(tool_use.input, json!({}));
4313                        }
4314                        _ => panic!("Expected ToolUse content"),
4315                    }
4316                }
4317
4318                // Insert the tool result (simulating what the caller does)
4319                thread
4320                    .pending_message()
4321                    .tool_results
4322                    .insert(result.tool_use_id.clone(), result);
4323
4324                // Verify the tool result was added
4325                let last_message = thread.pending_message();
4326                assert_eq!(
4327                    last_message.tool_results.len(),
4328                    1,
4329                    "Should have one tool_result"
4330                );
4331                assert!(last_message.tool_results.contains_key(&tool_use_id));
4332            });
4333        });
4334    }
4335}