1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ProjectSnapshot, ReadFileTool,
   5    SystemPromptTemplate, Template, Templates, TerminalTool, ThinkingTool, WebSearchTool,
   6};
   7use acp_thread::{MentionUri, UserMessageId};
   8use action_log::ActionLog;
   9
  10use agent_client_protocol as acp;
  11use agent_settings::{
  12    AgentProfileId, AgentProfileSettings, AgentSettings, CompletionMode,
  13    SUMMARIZE_THREAD_DETAILED_PROMPT, SUMMARIZE_THREAD_PROMPT,
  14};
  15use anyhow::{Context as _, Result, anyhow};
  16use chrono::{DateTime, Utc};
  17use client::{ModelRequestUsage, RequestUsage, UserStore};
  18use cloud_llm_client::{CompletionIntent, CompletionRequestStatus, Plan, UsageLimit};
  19use collections::{HashMap, HashSet, IndexMap};
  20use fs::Fs;
  21use futures::stream;
  22use futures::{
  23    FutureExt,
  24    channel::{mpsc, oneshot},
  25    future::Shared,
  26    stream::FuturesUnordered,
  27};
  28use gpui::{
  29    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  30};
  31use language_model::{
  32    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelExt,
  33    LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
  34    LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
  35    LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse,
  36    LanguageModelToolUseId, Role, SelectedModel, StopReason, TokenUsage, ZED_CLOUD_PROVIDER_ID,
  37};
  38use project::Project;
  39use prompt_store::ProjectContext;
  40use schemars::{JsonSchema, Schema};
  41use serde::{Deserialize, Serialize};
  42use settings::{Settings, update_settings_file};
  43use smol::stream::StreamExt;
  44use std::{
  45    collections::BTreeMap,
  46    ops::RangeInclusive,
  47    path::Path,
  48    rc::Rc,
  49    sync::Arc,
  50    time::{Duration, Instant},
  51};
  52use std::{fmt::Write, path::PathBuf};
  53use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock, paths::PathStyle};
  54use uuid::Uuid;
  55
  56const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  57pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  58
  59/// The ID of the user prompt that initiated a request.
  60///
  61/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  62#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  63pub struct PromptId(Arc<str>);
  64
  65impl PromptId {
  66    pub fn new() -> Self {
  67        Self(Uuid::new_v4().to_string().into())
  68    }
  69}
  70
  71impl std::fmt::Display for PromptId {
  72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  73        write!(f, "{}", self.0)
  74    }
  75}
  76
  77pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
  78pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
  79
  80#[derive(Debug, Clone)]
  81enum RetryStrategy {
  82    ExponentialBackoff {
  83        initial_delay: Duration,
  84        max_attempts: u8,
  85    },
  86    Fixed {
  87        delay: Duration,
  88        max_attempts: u8,
  89    },
  90}
  91
  92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  93pub enum Message {
  94    User(UserMessage),
  95    Agent(AgentMessage),
  96    Resume,
  97}
  98
  99impl Message {
 100    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 101        match self {
 102            Message::Agent(agent_message) => Some(agent_message),
 103            _ => None,
 104        }
 105    }
 106
 107    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 108        match self {
 109            Message::User(message) => vec![message.to_request()],
 110            Message::Agent(message) => message.to_request(),
 111            Message::Resume => vec![LanguageModelRequestMessage {
 112                role: Role::User,
 113                content: vec!["Continue where you left off".into()],
 114                cache: false,
 115            }],
 116        }
 117    }
 118
 119    pub fn to_markdown(&self) -> String {
 120        match self {
 121            Message::User(message) => message.to_markdown(),
 122            Message::Agent(message) => message.to_markdown(),
 123            Message::Resume => "[resume]\n".into(),
 124        }
 125    }
 126
 127    pub fn role(&self) -> Role {
 128        match self {
 129            Message::User(_) | Message::Resume => Role::User,
 130            Message::Agent(_) => Role::Assistant,
 131        }
 132    }
 133}
 134
 135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 136pub struct UserMessage {
 137    pub id: UserMessageId,
 138    pub content: Vec<UserMessageContent>,
 139}
 140
 141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 142pub enum UserMessageContent {
 143    Text(String),
 144    Mention { uri: MentionUri, content: String },
 145    Image(LanguageModelImage),
 146}
 147
 148impl UserMessage {
 149    pub fn to_markdown(&self) -> String {
 150        let mut markdown = String::from("## User\n\n");
 151
 152        for content in &self.content {
 153            match content {
 154                UserMessageContent::Text(text) => {
 155                    markdown.push_str(text);
 156                    markdown.push('\n');
 157                }
 158                UserMessageContent::Image(_) => {
 159                    markdown.push_str("<image />\n");
 160                }
 161                UserMessageContent::Mention { uri, content } => {
 162                    if !content.is_empty() {
 163                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 164                    } else {
 165                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 166                    }
 167                }
 168            }
 169        }
 170
 171        markdown
 172    }
 173
 174    fn to_request(&self) -> LanguageModelRequestMessage {
 175        let mut message = LanguageModelRequestMessage {
 176            role: Role::User,
 177            content: Vec::with_capacity(self.content.len()),
 178            cache: false,
 179        };
 180
 181        const OPEN_CONTEXT: &str = "<context>\n\
 182            The following items were attached by the user. \
 183            They are up-to-date and don't need to be re-read.\n\n";
 184
 185        const OPEN_FILES_TAG: &str = "<files>";
 186        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 187        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 188        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 189        const OPEN_THREADS_TAG: &str = "<threads>";
 190        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 191        const OPEN_RULES_TAG: &str =
 192            "<rules>\nThe user has specified the following rules that should be applied:\n";
 193
 194        let mut file_context = OPEN_FILES_TAG.to_string();
 195        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 196        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 197        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 198        let mut thread_context = OPEN_THREADS_TAG.to_string();
 199        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 200        let mut rules_context = OPEN_RULES_TAG.to_string();
 201
 202        for chunk in &self.content {
 203            let chunk = match chunk {
 204                UserMessageContent::Text(text) => {
 205                    language_model::MessageContent::Text(text.clone())
 206                }
 207                UserMessageContent::Image(value) => {
 208                    language_model::MessageContent::Image(value.clone())
 209                }
 210                UserMessageContent::Mention { uri, content } => {
 211                    match uri {
 212                        MentionUri::File { abs_path } => {
 213                            write!(
 214                                &mut file_context,
 215                                "\n{}",
 216                                MarkdownCodeBlock {
 217                                    tag: &codeblock_tag(abs_path, None),
 218                                    text: &content.to_string(),
 219                                }
 220                            )
 221                            .ok();
 222                        }
 223                        MentionUri::PastedImage => {
 224                            debug_panic!("pasted image URI should not be used in mention content")
 225                        }
 226                        MentionUri::Directory { .. } => {
 227                            write!(&mut directory_context, "\n{}\n", content).ok();
 228                        }
 229                        MentionUri::Symbol {
 230                            abs_path: path,
 231                            line_range,
 232                            ..
 233                        } => {
 234                            write!(
 235                                &mut symbol_context,
 236                                "\n{}",
 237                                MarkdownCodeBlock {
 238                                    tag: &codeblock_tag(path, Some(line_range)),
 239                                    text: content
 240                                }
 241                            )
 242                            .ok();
 243                        }
 244                        MentionUri::Selection {
 245                            abs_path: path,
 246                            line_range,
 247                            ..
 248                        } => {
 249                            write!(
 250                                &mut selection_context,
 251                                "\n{}",
 252                                MarkdownCodeBlock {
 253                                    tag: &codeblock_tag(
 254                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 255                                        Some(line_range)
 256                                    ),
 257                                    text: content
 258                                }
 259                            )
 260                            .ok();
 261                        }
 262                        MentionUri::Thread { .. } => {
 263                            write!(&mut thread_context, "\n{}\n", content).ok();
 264                        }
 265                        MentionUri::TextThread { .. } => {
 266                            write!(&mut thread_context, "\n{}\n", content).ok();
 267                        }
 268                        MentionUri::Rule { .. } => {
 269                            write!(
 270                                &mut rules_context,
 271                                "\n{}",
 272                                MarkdownCodeBlock {
 273                                    tag: "",
 274                                    text: content
 275                                }
 276                            )
 277                            .ok();
 278                        }
 279                        MentionUri::Fetch { url } => {
 280                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 281                        }
 282                    }
 283
 284                    language_model::MessageContent::Text(uri.as_link().to_string())
 285                }
 286            };
 287
 288            message.content.push(chunk);
 289        }
 290
 291        let len_before_context = message.content.len();
 292
 293        if file_context.len() > OPEN_FILES_TAG.len() {
 294            file_context.push_str("</files>\n");
 295            message
 296                .content
 297                .push(language_model::MessageContent::Text(file_context));
 298        }
 299
 300        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 301            directory_context.push_str("</directories>\n");
 302            message
 303                .content
 304                .push(language_model::MessageContent::Text(directory_context));
 305        }
 306
 307        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 308            symbol_context.push_str("</symbols>\n");
 309            message
 310                .content
 311                .push(language_model::MessageContent::Text(symbol_context));
 312        }
 313
 314        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 315            selection_context.push_str("</selections>\n");
 316            message
 317                .content
 318                .push(language_model::MessageContent::Text(selection_context));
 319        }
 320
 321        if thread_context.len() > OPEN_THREADS_TAG.len() {
 322            thread_context.push_str("</threads>\n");
 323            message
 324                .content
 325                .push(language_model::MessageContent::Text(thread_context));
 326        }
 327
 328        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 329            fetch_context.push_str("</fetched_urls>\n");
 330            message
 331                .content
 332                .push(language_model::MessageContent::Text(fetch_context));
 333        }
 334
 335        if rules_context.len() > OPEN_RULES_TAG.len() {
 336            rules_context.push_str("</user_rules>\n");
 337            message
 338                .content
 339                .push(language_model::MessageContent::Text(rules_context));
 340        }
 341
 342        if message.content.len() > len_before_context {
 343            message.content.insert(
 344                len_before_context,
 345                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 346            );
 347            message
 348                .content
 349                .push(language_model::MessageContent::Text("</context>".into()));
 350        }
 351
 352        message
 353    }
 354}
 355
 356fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 357    let mut result = String::new();
 358
 359    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 360        let _ = write!(result, "{} ", extension);
 361    }
 362
 363    let _ = write!(result, "{}", full_path.display());
 364
 365    if let Some(range) = line_range {
 366        if range.start() == range.end() {
 367            let _ = write!(result, ":{}", range.start() + 1);
 368        } else {
 369            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 370        }
 371    }
 372
 373    result
 374}
 375
 376impl AgentMessage {
 377    pub fn to_markdown(&self) -> String {
 378        let mut markdown = String::from("## Assistant\n\n");
 379
 380        for content in &self.content {
 381            match content {
 382                AgentMessageContent::Text(text) => {
 383                    markdown.push_str(text);
 384                    markdown.push('\n');
 385                }
 386                AgentMessageContent::Thinking { text, .. } => {
 387                    markdown.push_str("<think>");
 388                    markdown.push_str(text);
 389                    markdown.push_str("</think>\n");
 390                }
 391                AgentMessageContent::RedactedThinking(_) => {
 392                    markdown.push_str("<redacted_thinking />\n")
 393                }
 394                AgentMessageContent::ToolUse(tool_use) => {
 395                    markdown.push_str(&format!(
 396                        "**Tool Use**: {} (ID: {})\n",
 397                        tool_use.name, tool_use.id
 398                    ));
 399                    markdown.push_str(&format!(
 400                        "{}\n",
 401                        MarkdownCodeBlock {
 402                            tag: "json",
 403                            text: &format!("{:#}", tool_use.input)
 404                        }
 405                    ));
 406                }
 407            }
 408        }
 409
 410        for tool_result in self.tool_results.values() {
 411            markdown.push_str(&format!(
 412                "**Tool Result**: {} (ID: {})\n\n",
 413                tool_result.tool_name, tool_result.tool_use_id
 414            ));
 415            if tool_result.is_error {
 416                markdown.push_str("**ERROR:**\n");
 417            }
 418
 419            match &tool_result.content {
 420                LanguageModelToolResultContent::Text(text) => {
 421                    writeln!(markdown, "{text}\n").ok();
 422                }
 423                LanguageModelToolResultContent::Image(_) => {
 424                    writeln!(markdown, "<image />\n").ok();
 425                }
 426            }
 427
 428            if let Some(output) = tool_result.output.as_ref() {
 429                writeln!(
 430                    markdown,
 431                    "**Debug Output**:\n\n```json\n{}\n```\n",
 432                    serde_json::to_string_pretty(output).unwrap()
 433                )
 434                .unwrap();
 435            }
 436        }
 437
 438        markdown
 439    }
 440
 441    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 442        let mut assistant_message = LanguageModelRequestMessage {
 443            role: Role::Assistant,
 444            content: Vec::with_capacity(self.content.len()),
 445            cache: false,
 446        };
 447        for chunk in &self.content {
 448            match chunk {
 449                AgentMessageContent::Text(text) => {
 450                    assistant_message
 451                        .content
 452                        .push(language_model::MessageContent::Text(text.clone()));
 453                }
 454                AgentMessageContent::Thinking { text, signature } => {
 455                    assistant_message
 456                        .content
 457                        .push(language_model::MessageContent::Thinking {
 458                            text: text.clone(),
 459                            signature: signature.clone(),
 460                        });
 461                }
 462                AgentMessageContent::RedactedThinking(value) => {
 463                    assistant_message.content.push(
 464                        language_model::MessageContent::RedactedThinking(value.clone()),
 465                    );
 466                }
 467                AgentMessageContent::ToolUse(tool_use) => {
 468                    if self.tool_results.contains_key(&tool_use.id) {
 469                        assistant_message
 470                            .content
 471                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 472                    }
 473                }
 474            };
 475        }
 476
 477        let mut user_message = LanguageModelRequestMessage {
 478            role: Role::User,
 479            content: Vec::new(),
 480            cache: false,
 481        };
 482
 483        for tool_result in self.tool_results.values() {
 484            let mut tool_result = tool_result.clone();
 485            // Surprisingly, the API fails if we return an empty string here.
 486            // It thinks we are sending a tool use without a tool result.
 487            if tool_result.content.is_empty() {
 488                tool_result.content = "<Tool returned an empty string>".into();
 489            }
 490            user_message
 491                .content
 492                .push(language_model::MessageContent::ToolResult(tool_result));
 493        }
 494
 495        let mut messages = Vec::new();
 496        if !assistant_message.content.is_empty() {
 497            messages.push(assistant_message);
 498        }
 499        if !user_message.content.is_empty() {
 500            messages.push(user_message);
 501        }
 502        messages
 503    }
 504}
 505
 506#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 507pub struct AgentMessage {
 508    pub content: Vec<AgentMessageContent>,
 509    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 510}
 511
 512#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 513pub enum AgentMessageContent {
 514    Text(String),
 515    Thinking {
 516        text: String,
 517        signature: Option<String>,
 518    },
 519    RedactedThinking(String),
 520    ToolUse(LanguageModelToolUse),
 521}
 522
 523pub trait TerminalHandle {
 524    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 525    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 526    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 527}
 528
 529pub trait ThreadEnvironment {
 530    fn create_terminal(
 531        &self,
 532        command: String,
 533        cwd: Option<PathBuf>,
 534        output_byte_limit: Option<u64>,
 535        cx: &mut AsyncApp,
 536    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 537}
 538
 539#[derive(Debug)]
 540pub enum ThreadEvent {
 541    UserMessage(UserMessage),
 542    AgentText(String),
 543    AgentThinking(String),
 544    ToolCall(acp::ToolCall),
 545    ToolCallUpdate(acp_thread::ToolCallUpdate),
 546    ToolCallAuthorization(ToolCallAuthorization),
 547    Retry(acp_thread::RetryStatus),
 548    Stop(acp::StopReason),
 549}
 550
 551#[derive(Debug)]
 552pub struct NewTerminal {
 553    pub command: String,
 554    pub output_byte_limit: Option<u64>,
 555    pub cwd: Option<PathBuf>,
 556    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 557}
 558
 559#[derive(Debug)]
 560pub struct ToolCallAuthorization {
 561    pub tool_call: acp::ToolCallUpdate,
 562    pub options: Vec<acp::PermissionOption>,
 563    pub response: oneshot::Sender<acp::PermissionOptionId>,
 564}
 565
 566#[derive(Debug, thiserror::Error)]
 567enum CompletionError {
 568    #[error("max tokens")]
 569    MaxTokens,
 570    #[error("refusal")]
 571    Refusal,
 572    #[error(transparent)]
 573    Other(#[from] anyhow::Error),
 574}
 575
 576pub struct Thread {
 577    id: acp::SessionId,
 578    prompt_id: PromptId,
 579    updated_at: DateTime<Utc>,
 580    title: Option<SharedString>,
 581    pending_title_generation: Option<Task<()>>,
 582    pending_summary_generation: Option<Shared<Task<Option<SharedString>>>>,
 583    summary: Option<SharedString>,
 584    messages: Vec<Message>,
 585    user_store: Entity<UserStore>,
 586    completion_mode: CompletionMode,
 587    /// Holds the task that handles agent interaction until the end of the turn.
 588    /// Survives across multiple requests as the model performs tool calls and
 589    /// we run tools, report their results.
 590    running_turn: Option<RunningTurn>,
 591    pending_message: Option<AgentMessage>,
 592    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 593    tool_use_limit_reached: bool,
 594    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 595    #[allow(unused)]
 596    cumulative_token_usage: TokenUsage,
 597    #[allow(unused)]
 598    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 599    context_server_registry: Entity<ContextServerRegistry>,
 600    profile_id: AgentProfileId,
 601    project_context: Entity<ProjectContext>,
 602    templates: Arc<Templates>,
 603    model: Option<Arc<dyn LanguageModel>>,
 604    summarization_model: Option<Arc<dyn LanguageModel>>,
 605    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 606    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 607    pub(crate) project: Entity<Project>,
 608    pub(crate) action_log: Entity<ActionLog>,
 609}
 610
 611impl Thread {
 612    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 613        let image = model.map_or(true, |model| model.supports_images());
 614        acp::PromptCapabilities {
 615            meta: None,
 616            image,
 617            audio: false,
 618            embedded_context: true,
 619        }
 620    }
 621
 622    pub fn new(
 623        project: Entity<Project>,
 624        project_context: Entity<ProjectContext>,
 625        context_server_registry: Entity<ContextServerRegistry>,
 626        templates: Arc<Templates>,
 627        model: Option<Arc<dyn LanguageModel>>,
 628        cx: &mut Context<Self>,
 629    ) -> Self {
 630        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 631        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 632        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 633            watch::channel(Self::prompt_capabilities(model.as_deref()));
 634        Self {
 635            id: acp::SessionId(uuid::Uuid::new_v4().to_string().into()),
 636            prompt_id: PromptId::new(),
 637            updated_at: Utc::now(),
 638            title: None,
 639            pending_title_generation: None,
 640            pending_summary_generation: None,
 641            summary: None,
 642            messages: Vec::new(),
 643            user_store: project.read(cx).user_store(),
 644            completion_mode: AgentSettings::get_global(cx).preferred_completion_mode,
 645            running_turn: None,
 646            pending_message: None,
 647            tools: BTreeMap::default(),
 648            tool_use_limit_reached: false,
 649            request_token_usage: HashMap::default(),
 650            cumulative_token_usage: TokenUsage::default(),
 651            initial_project_snapshot: {
 652                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 653                cx.foreground_executor()
 654                    .spawn(async move { Some(project_snapshot.await) })
 655                    .shared()
 656            },
 657            context_server_registry,
 658            profile_id,
 659            project_context,
 660            templates,
 661            model,
 662            summarization_model: None,
 663            prompt_capabilities_tx,
 664            prompt_capabilities_rx,
 665            project,
 666            action_log,
 667        }
 668    }
 669
 670    pub fn id(&self) -> &acp::SessionId {
 671        &self.id
 672    }
 673
 674    pub fn replay(
 675        &mut self,
 676        cx: &mut Context<Self>,
 677    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 678        let (tx, rx) = mpsc::unbounded();
 679        let stream = ThreadEventStream(tx);
 680        for message in &self.messages {
 681            match message {
 682                Message::User(user_message) => stream.send_user_message(user_message),
 683                Message::Agent(assistant_message) => {
 684                    for content in &assistant_message.content {
 685                        match content {
 686                            AgentMessageContent::Text(text) => stream.send_text(text),
 687                            AgentMessageContent::Thinking { text, .. } => {
 688                                stream.send_thinking(text)
 689                            }
 690                            AgentMessageContent::RedactedThinking(_) => {}
 691                            AgentMessageContent::ToolUse(tool_use) => {
 692                                self.replay_tool_call(
 693                                    tool_use,
 694                                    assistant_message.tool_results.get(&tool_use.id),
 695                                    &stream,
 696                                    cx,
 697                                );
 698                            }
 699                        }
 700                    }
 701                }
 702                Message::Resume => {}
 703            }
 704        }
 705        rx
 706    }
 707
 708    fn replay_tool_call(
 709        &self,
 710        tool_use: &LanguageModelToolUse,
 711        tool_result: Option<&LanguageModelToolResult>,
 712        stream: &ThreadEventStream,
 713        cx: &mut Context<Self>,
 714    ) {
 715        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
 716            self.context_server_registry
 717                .read(cx)
 718                .servers()
 719                .find_map(|(_, tools)| {
 720                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
 721                        Some(tool.clone())
 722                    } else {
 723                        None
 724                    }
 725                })
 726        });
 727
 728        let Some(tool) = tool else {
 729            stream
 730                .0
 731                .unbounded_send(Ok(ThreadEvent::ToolCall(acp::ToolCall {
 732                    meta: None,
 733                    id: acp::ToolCallId(tool_use.id.to_string().into()),
 734                    title: tool_use.name.to_string(),
 735                    kind: acp::ToolKind::Other,
 736                    status: acp::ToolCallStatus::Failed,
 737                    content: Vec::new(),
 738                    locations: Vec::new(),
 739                    raw_input: Some(tool_use.input.clone()),
 740                    raw_output: None,
 741                })))
 742                .ok();
 743            return;
 744        };
 745
 746        let title = tool.initial_title(tool_use.input.clone(), cx);
 747        let kind = tool.kind();
 748        stream.send_tool_call(
 749            &tool_use.id,
 750            &tool_use.name,
 751            title,
 752            kind,
 753            tool_use.input.clone(),
 754        );
 755
 756        let output = tool_result
 757            .as_ref()
 758            .and_then(|result| result.output.clone());
 759        if let Some(output) = output.clone() {
 760            let tool_event_stream = ToolCallEventStream::new(
 761                tool_use.id.clone(),
 762                stream.clone(),
 763                Some(self.project.read(cx).fs().clone()),
 764            );
 765            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
 766                .log_err();
 767        }
 768
 769        stream.update_tool_call_fields(
 770            &tool_use.id,
 771            acp::ToolCallUpdateFields {
 772                status: Some(
 773                    tool_result
 774                        .as_ref()
 775                        .map_or(acp::ToolCallStatus::Failed, |result| {
 776                            if result.is_error {
 777                                acp::ToolCallStatus::Failed
 778                            } else {
 779                                acp::ToolCallStatus::Completed
 780                            }
 781                        }),
 782                ),
 783                raw_output: output,
 784                ..Default::default()
 785            },
 786        );
 787    }
 788
 789    pub fn from_db(
 790        id: acp::SessionId,
 791        db_thread: DbThread,
 792        project: Entity<Project>,
 793        project_context: Entity<ProjectContext>,
 794        context_server_registry: Entity<ContextServerRegistry>,
 795        templates: Arc<Templates>,
 796        cx: &mut Context<Self>,
 797    ) -> Self {
 798        let profile_id = db_thread
 799            .profile
 800            .unwrap_or_else(|| AgentSettings::get_global(cx).default_profile.clone());
 801        let model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
 802            db_thread
 803                .model
 804                .and_then(|model| {
 805                    let model = SelectedModel {
 806                        provider: model.provider.clone().into(),
 807                        model: model.model.into(),
 808                    };
 809                    registry.select_model(&model, cx)
 810                })
 811                .or_else(|| registry.default_model())
 812                .map(|model| model.model)
 813        });
 814        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 815            watch::channel(Self::prompt_capabilities(model.as_deref()));
 816
 817        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 818
 819        Self {
 820            id,
 821            prompt_id: PromptId::new(),
 822            title: if db_thread.title.is_empty() {
 823                None
 824            } else {
 825                Some(db_thread.title.clone())
 826            },
 827            pending_title_generation: None,
 828            pending_summary_generation: None,
 829            summary: db_thread.detailed_summary,
 830            messages: db_thread.messages,
 831            user_store: project.read(cx).user_store(),
 832            completion_mode: db_thread.completion_mode.unwrap_or_default(),
 833            running_turn: None,
 834            pending_message: None,
 835            tools: BTreeMap::default(),
 836            tool_use_limit_reached: false,
 837            request_token_usage: db_thread.request_token_usage.clone(),
 838            cumulative_token_usage: db_thread.cumulative_token_usage,
 839            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
 840            context_server_registry,
 841            profile_id,
 842            project_context,
 843            templates,
 844            model,
 845            summarization_model: None,
 846            project,
 847            action_log,
 848            updated_at: db_thread.updated_at,
 849            prompt_capabilities_tx,
 850            prompt_capabilities_rx,
 851        }
 852    }
 853
 854    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
 855        let initial_project_snapshot = self.initial_project_snapshot.clone();
 856        let mut thread = DbThread {
 857            title: self.title(),
 858            messages: self.messages.clone(),
 859            updated_at: self.updated_at,
 860            detailed_summary: self.summary.clone(),
 861            initial_project_snapshot: None,
 862            cumulative_token_usage: self.cumulative_token_usage,
 863            request_token_usage: self.request_token_usage.clone(),
 864            model: self.model.as_ref().map(|model| DbLanguageModel {
 865                provider: model.provider_id().to_string(),
 866                model: model.name().0.to_string(),
 867            }),
 868            completion_mode: Some(self.completion_mode),
 869            profile: Some(self.profile_id.clone()),
 870        };
 871
 872        cx.background_spawn(async move {
 873            let initial_project_snapshot = initial_project_snapshot.await;
 874            thread.initial_project_snapshot = initial_project_snapshot;
 875            thread
 876        })
 877    }
 878
 879    /// Create a snapshot of the current project state including git information and unsaved buffers.
 880    fn project_snapshot(
 881        project: Entity<Project>,
 882        cx: &mut Context<Self>,
 883    ) -> Task<Arc<ProjectSnapshot>> {
 884        let task = project::telemetry_snapshot::TelemetrySnapshot::new(&project, cx);
 885        cx.spawn(async move |_, _| {
 886            let snapshot = task.await;
 887
 888            Arc::new(ProjectSnapshot {
 889                worktree_snapshots: snapshot.worktree_snapshots,
 890                timestamp: Utc::now(),
 891            })
 892        })
 893    }
 894
 895    pub fn project_context(&self) -> &Entity<ProjectContext> {
 896        &self.project_context
 897    }
 898
 899    pub fn project(&self) -> &Entity<Project> {
 900        &self.project
 901    }
 902
 903    pub fn action_log(&self) -> &Entity<ActionLog> {
 904        &self.action_log
 905    }
 906
 907    pub fn is_empty(&self) -> bool {
 908        self.messages.is_empty() && self.title.is_none()
 909    }
 910
 911    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
 912        self.model.as_ref()
 913    }
 914
 915    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
 916        let old_usage = self.latest_token_usage();
 917        self.model = Some(model);
 918        let new_caps = Self::prompt_capabilities(self.model.as_deref());
 919        let new_usage = self.latest_token_usage();
 920        if old_usage != new_usage {
 921            cx.emit(TokenUsageUpdated(new_usage));
 922        }
 923        self.prompt_capabilities_tx.send(new_caps).log_err();
 924        cx.notify()
 925    }
 926
 927    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
 928        self.summarization_model.as_ref()
 929    }
 930
 931    pub fn set_summarization_model(
 932        &mut self,
 933        model: Option<Arc<dyn LanguageModel>>,
 934        cx: &mut Context<Self>,
 935    ) {
 936        self.summarization_model = model;
 937        cx.notify()
 938    }
 939
 940    pub fn completion_mode(&self) -> CompletionMode {
 941        self.completion_mode
 942    }
 943
 944    pub fn set_completion_mode(&mut self, mode: CompletionMode, cx: &mut Context<Self>) {
 945        let old_usage = self.latest_token_usage();
 946        self.completion_mode = mode;
 947        let new_usage = self.latest_token_usage();
 948        if old_usage != new_usage {
 949            cx.emit(TokenUsageUpdated(new_usage));
 950        }
 951        cx.notify()
 952    }
 953
 954    #[cfg(any(test, feature = "test-support"))]
 955    pub fn last_message(&self) -> Option<Message> {
 956        if let Some(message) = self.pending_message.clone() {
 957            Some(Message::Agent(message))
 958        } else {
 959            self.messages.last().cloned()
 960        }
 961    }
 962
 963    pub fn add_default_tools(
 964        &mut self,
 965        environment: Rc<dyn ThreadEnvironment>,
 966        cx: &mut Context<Self>,
 967    ) {
 968        let language_registry = self.project.read(cx).languages().clone();
 969        self.add_tool(CopyPathTool::new(self.project.clone()));
 970        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
 971        self.add_tool(DeletePathTool::new(
 972            self.project.clone(),
 973            self.action_log.clone(),
 974        ));
 975        self.add_tool(DiagnosticsTool::new(self.project.clone()));
 976        self.add_tool(EditFileTool::new(
 977            self.project.clone(),
 978            cx.weak_entity(),
 979            language_registry,
 980            Templates::new(),
 981        ));
 982        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
 983        self.add_tool(FindPathTool::new(self.project.clone()));
 984        self.add_tool(GrepTool::new(self.project.clone()));
 985        self.add_tool(ListDirectoryTool::new(self.project.clone()));
 986        self.add_tool(MovePathTool::new(self.project.clone()));
 987        self.add_tool(NowTool);
 988        self.add_tool(OpenTool::new(self.project.clone()));
 989        self.add_tool(ReadFileTool::new(
 990            self.project.clone(),
 991            self.action_log.clone(),
 992        ));
 993        self.add_tool(TerminalTool::new(self.project.clone(), environment));
 994        self.add_tool(ThinkingTool);
 995        self.add_tool(WebSearchTool);
 996    }
 997
 998    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
 999        self.tools.insert(T::name().into(), tool.erase());
1000    }
1001
1002    pub fn remove_tool(&mut self, name: &str) -> bool {
1003        self.tools.remove(name).is_some()
1004    }
1005
1006    pub fn profile(&self) -> &AgentProfileId {
1007        &self.profile_id
1008    }
1009
1010    pub fn set_profile(&mut self, profile_id: AgentProfileId) {
1011        self.profile_id = profile_id;
1012    }
1013
1014    pub fn cancel(&mut self, cx: &mut Context<Self>) {
1015        if let Some(running_turn) = self.running_turn.take() {
1016            running_turn.cancel();
1017        }
1018        self.flush_pending_message(cx);
1019    }
1020
1021    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1022        let Some(last_user_message) = self.last_user_message() else {
1023            return;
1024        };
1025
1026        self.request_token_usage
1027            .insert(last_user_message.id.clone(), update);
1028        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1029        cx.notify();
1030    }
1031
1032    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1033        self.cancel(cx);
1034        let Some(position) = self.messages.iter().position(
1035            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1036        ) else {
1037            return Err(anyhow!("Message not found"));
1038        };
1039
1040        for message in self.messages.drain(position..) {
1041            match message {
1042                Message::User(message) => {
1043                    self.request_token_usage.remove(&message.id);
1044                }
1045                Message::Agent(_) | Message::Resume => {}
1046            }
1047        }
1048        self.clear_summary();
1049        cx.notify();
1050        Ok(())
1051    }
1052
1053    pub fn latest_request_token_usage(&self) -> Option<language_model::TokenUsage> {
1054        let last_user_message = self.last_user_message()?;
1055        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1056        Some(*tokens)
1057    }
1058
1059    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1060        let usage = self.latest_request_token_usage()?;
1061        let model = self.model.clone()?;
1062        Some(acp_thread::TokenUsage {
1063            max_tokens: model.max_token_count_for_mode(self.completion_mode.into()),
1064            used_tokens: usage.total_tokens(),
1065        })
1066    }
1067
1068    pub fn resume(
1069        &mut self,
1070        cx: &mut Context<Self>,
1071    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1072        self.messages.push(Message::Resume);
1073        cx.notify();
1074
1075        log::debug!("Total messages in thread: {}", self.messages.len());
1076        self.run_turn(cx)
1077    }
1078
1079    /// Sending a message results in the model streaming a response, which could include tool calls.
1080    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1081    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1082    pub fn send<T>(
1083        &mut self,
1084        id: UserMessageId,
1085        content: impl IntoIterator<Item = T>,
1086        cx: &mut Context<Self>,
1087    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1088    where
1089        T: Into<UserMessageContent>,
1090    {
1091        let model = self.model().context("No language model configured")?;
1092
1093        log::info!("Thread::send called with model: {}", model.name().0);
1094        self.advance_prompt_id();
1095
1096        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1097        log::debug!("Thread::send content: {:?}", content);
1098
1099        self.messages
1100            .push(Message::User(UserMessage { id, content }));
1101        cx.notify();
1102
1103        log::debug!("Total messages in thread: {}", self.messages.len());
1104        self.run_turn(cx)
1105    }
1106
1107    #[cfg(feature = "eval")]
1108    pub fn proceed(
1109        &mut self,
1110        cx: &mut Context<Self>,
1111    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1112        self.run_turn(cx)
1113    }
1114
1115    fn run_turn(
1116        &mut self,
1117        cx: &mut Context<Self>,
1118    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1119        self.cancel(cx);
1120
1121        let model = self.model.clone().context("No language model configured")?;
1122        let profile = AgentSettings::get_global(cx)
1123            .profiles
1124            .get(&self.profile_id)
1125            .context("Profile not found")?;
1126        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1127        let event_stream = ThreadEventStream(events_tx);
1128        let message_ix = self.messages.len().saturating_sub(1);
1129        self.tool_use_limit_reached = false;
1130        self.clear_summary();
1131        self.running_turn = Some(RunningTurn {
1132            event_stream: event_stream.clone(),
1133            tools: self.enabled_tools(profile, &model, cx),
1134            _task: cx.spawn(async move |this, cx| {
1135                log::debug!("Starting agent turn execution");
1136
1137                let turn_result = Self::run_turn_internal(&this, model, &event_stream, cx).await;
1138                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1139
1140                match turn_result {
1141                    Ok(()) => {
1142                        log::debug!("Turn execution completed");
1143                        event_stream.send_stop(acp::StopReason::EndTurn);
1144                    }
1145                    Err(error) => {
1146                        log::error!("Turn execution failed: {:?}", error);
1147                        match error.downcast::<CompletionError>() {
1148                            Ok(CompletionError::Refusal) => {
1149                                event_stream.send_stop(acp::StopReason::Refusal);
1150                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1151                            }
1152                            Ok(CompletionError::MaxTokens) => {
1153                                event_stream.send_stop(acp::StopReason::MaxTokens);
1154                            }
1155                            Ok(CompletionError::Other(error)) | Err(error) => {
1156                                event_stream.send_error(error);
1157                            }
1158                        }
1159                    }
1160                }
1161
1162                _ = this.update(cx, |this, _| this.running_turn.take());
1163            }),
1164        });
1165        Ok(events_rx)
1166    }
1167
1168    async fn run_turn_internal(
1169        this: &WeakEntity<Self>,
1170        model: Arc<dyn LanguageModel>,
1171        event_stream: &ThreadEventStream,
1172        cx: &mut AsyncApp,
1173    ) -> Result<()> {
1174        let mut attempt = 0;
1175        let mut intent = CompletionIntent::UserPrompt;
1176        loop {
1177            let request =
1178                this.update(cx, |this, cx| this.build_completion_request(intent, cx))??;
1179
1180            telemetry::event!(
1181                "Agent Thread Completion",
1182                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1183                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1184                model = model.telemetry_id(),
1185                model_provider = model.provider_id().to_string(),
1186                attempt
1187            );
1188
1189            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1190
1191            let (mut events, mut error) = match model.stream_completion(request, cx).await {
1192                Ok(events) => (events, None),
1193                Err(err) => (stream::empty().boxed(), Some(err)),
1194            };
1195            let mut tool_results = FuturesUnordered::new();
1196            while let Some(event) = events.next().await {
1197                log::trace!("Received completion event: {:?}", event);
1198                match event {
1199                    Ok(event) => {
1200                        tool_results.extend(this.update(cx, |this, cx| {
1201                            this.handle_completion_event(event, event_stream, cx)
1202                        })??);
1203                    }
1204                    Err(err) => {
1205                        error = Some(err);
1206                        break;
1207                    }
1208                }
1209            }
1210
1211            let end_turn = tool_results.is_empty();
1212            while let Some(tool_result) = tool_results.next().await {
1213                log::debug!("Tool finished {:?}", tool_result);
1214
1215                event_stream.update_tool_call_fields(
1216                    &tool_result.tool_use_id,
1217                    acp::ToolCallUpdateFields {
1218                        status: Some(if tool_result.is_error {
1219                            acp::ToolCallStatus::Failed
1220                        } else {
1221                            acp::ToolCallStatus::Completed
1222                        }),
1223                        raw_output: tool_result.output.clone(),
1224                        ..Default::default()
1225                    },
1226                );
1227                this.update(cx, |this, _cx| {
1228                    this.pending_message()
1229                        .tool_results
1230                        .insert(tool_result.tool_use_id.clone(), tool_result);
1231                })?;
1232            }
1233
1234            this.update(cx, |this, cx| {
1235                this.flush_pending_message(cx);
1236                if this.title.is_none() && this.pending_title_generation.is_none() {
1237                    this.generate_title(cx);
1238                }
1239            })?;
1240
1241            if let Some(error) = error {
1242                attempt += 1;
1243                let retry = this.update(cx, |this, cx| {
1244                    let user_store = this.user_store.read(cx);
1245                    this.handle_completion_error(error, attempt, user_store.plan())
1246                })??;
1247                let timer = cx.background_executor().timer(retry.duration);
1248                event_stream.send_retry(retry);
1249                timer.await;
1250                this.update(cx, |this, _cx| {
1251                    if let Some(Message::Agent(message)) = this.messages.last() {
1252                        if message.tool_results.is_empty() {
1253                            intent = CompletionIntent::UserPrompt;
1254                            this.messages.push(Message::Resume);
1255                        }
1256                    }
1257                })?;
1258            } else if this.read_with(cx, |this, _| this.tool_use_limit_reached)? {
1259                return Err(language_model::ToolUseLimitReachedError.into());
1260            } else if end_turn {
1261                return Ok(());
1262            } else {
1263                intent = CompletionIntent::ToolResults;
1264                attempt = 0;
1265            }
1266        }
1267    }
1268
1269    fn handle_completion_error(
1270        &mut self,
1271        error: LanguageModelCompletionError,
1272        attempt: u8,
1273        plan: Option<Plan>,
1274    ) -> Result<acp_thread::RetryStatus> {
1275        let Some(model) = self.model.as_ref() else {
1276            return Err(anyhow!(error));
1277        };
1278
1279        let auto_retry = if model.provider_id() == ZED_CLOUD_PROVIDER_ID {
1280            match plan {
1281                Some(Plan::V2(_)) => true,
1282                Some(Plan::V1(_)) => self.completion_mode == CompletionMode::Burn,
1283                None => false,
1284            }
1285        } else {
1286            true
1287        };
1288
1289        if !auto_retry {
1290            return Err(anyhow!(error));
1291        }
1292
1293        let Some(strategy) = Self::retry_strategy_for(&error) else {
1294            return Err(anyhow!(error));
1295        };
1296
1297        let max_attempts = match &strategy {
1298            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1299            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1300        };
1301
1302        if attempt > max_attempts {
1303            return Err(anyhow!(error));
1304        }
1305
1306        let delay = match &strategy {
1307            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1308                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1309                Duration::from_secs(delay_secs)
1310            }
1311            RetryStrategy::Fixed { delay, .. } => *delay,
1312        };
1313        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1314
1315        Ok(acp_thread::RetryStatus {
1316            last_error: error.to_string().into(),
1317            attempt: attempt as usize,
1318            max_attempts: max_attempts as usize,
1319            started_at: Instant::now(),
1320            duration: delay,
1321        })
1322    }
1323
1324    /// A helper method that's called on every streamed completion event.
1325    /// Returns an optional tool result task, which the main agentic loop will
1326    /// send back to the model when it resolves.
1327    fn handle_completion_event(
1328        &mut self,
1329        event: LanguageModelCompletionEvent,
1330        event_stream: &ThreadEventStream,
1331        cx: &mut Context<Self>,
1332    ) -> Result<Option<Task<LanguageModelToolResult>>> {
1333        log::trace!("Handling streamed completion event: {:?}", event);
1334        use LanguageModelCompletionEvent::*;
1335
1336        match event {
1337            StartMessage { .. } => {
1338                self.flush_pending_message(cx);
1339                self.pending_message = Some(AgentMessage::default());
1340            }
1341            Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
1342            Thinking { text, signature } => {
1343                self.handle_thinking_event(text, signature, event_stream, cx)
1344            }
1345            RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
1346            ToolUse(tool_use) => {
1347                return Ok(self.handle_tool_use_event(tool_use, event_stream, cx));
1348            }
1349            ToolUseJsonParseError {
1350                id,
1351                tool_name,
1352                raw_input,
1353                json_parse_error,
1354            } => {
1355                return Ok(Some(Task::ready(
1356                    self.handle_tool_use_json_parse_error_event(
1357                        id,
1358                        tool_name,
1359                        raw_input,
1360                        json_parse_error,
1361                    ),
1362                )));
1363            }
1364            UsageUpdate(usage) => {
1365                telemetry::event!(
1366                    "Agent Thread Completion Usage Updated",
1367                    thread_id = self.id.to_string(),
1368                    prompt_id = self.prompt_id.to_string(),
1369                    model = self.model.as_ref().map(|m| m.telemetry_id()),
1370                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
1371                    input_tokens = usage.input_tokens,
1372                    output_tokens = usage.output_tokens,
1373                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
1374                    cache_read_input_tokens = usage.cache_read_input_tokens,
1375                );
1376                self.update_token_usage(usage, cx);
1377            }
1378            StatusUpdate(CompletionRequestStatus::UsageUpdated { amount, limit }) => {
1379                self.update_model_request_usage(amount, limit, cx);
1380            }
1381            StatusUpdate(
1382                CompletionRequestStatus::Started
1383                | CompletionRequestStatus::Queued { .. }
1384                | CompletionRequestStatus::Failed { .. },
1385            ) => {}
1386            StatusUpdate(CompletionRequestStatus::ToolUseLimitReached) => {
1387                self.tool_use_limit_reached = true;
1388            }
1389            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
1390            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
1391            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
1392        }
1393
1394        Ok(None)
1395    }
1396
1397    fn handle_text_event(
1398        &mut self,
1399        new_text: String,
1400        event_stream: &ThreadEventStream,
1401        cx: &mut Context<Self>,
1402    ) {
1403        event_stream.send_text(&new_text);
1404
1405        let last_message = self.pending_message();
1406        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1407            text.push_str(&new_text);
1408        } else {
1409            last_message
1410                .content
1411                .push(AgentMessageContent::Text(new_text));
1412        }
1413
1414        cx.notify();
1415    }
1416
1417    fn handle_thinking_event(
1418        &mut self,
1419        new_text: String,
1420        new_signature: Option<String>,
1421        event_stream: &ThreadEventStream,
1422        cx: &mut Context<Self>,
1423    ) {
1424        event_stream.send_thinking(&new_text);
1425
1426        let last_message = self.pending_message();
1427        if let Some(AgentMessageContent::Thinking { text, signature }) =
1428            last_message.content.last_mut()
1429        {
1430            text.push_str(&new_text);
1431            *signature = new_signature.or(signature.take());
1432        } else {
1433            last_message.content.push(AgentMessageContent::Thinking {
1434                text: new_text,
1435                signature: new_signature,
1436            });
1437        }
1438
1439        cx.notify();
1440    }
1441
1442    fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
1443        let last_message = self.pending_message();
1444        last_message
1445            .content
1446            .push(AgentMessageContent::RedactedThinking(data));
1447        cx.notify();
1448    }
1449
1450    fn handle_tool_use_event(
1451        &mut self,
1452        tool_use: LanguageModelToolUse,
1453        event_stream: &ThreadEventStream,
1454        cx: &mut Context<Self>,
1455    ) -> Option<Task<LanguageModelToolResult>> {
1456        cx.notify();
1457
1458        let tool = self.tool(tool_use.name.as_ref());
1459        let mut title = SharedString::from(&tool_use.name);
1460        let mut kind = acp::ToolKind::Other;
1461        if let Some(tool) = tool.as_ref() {
1462            title = tool.initial_title(tool_use.input.clone(), cx);
1463            kind = tool.kind();
1464        }
1465
1466        // Ensure the last message ends in the current tool use
1467        let last_message = self.pending_message();
1468        let push_new_tool_use = last_message.content.last_mut().is_none_or(|content| {
1469            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1470                if last_tool_use.id == tool_use.id {
1471                    *last_tool_use = tool_use.clone();
1472                    false
1473                } else {
1474                    true
1475                }
1476            } else {
1477                true
1478            }
1479        });
1480
1481        if push_new_tool_use {
1482            event_stream.send_tool_call(
1483                &tool_use.id,
1484                &tool_use.name,
1485                title,
1486                kind,
1487                tool_use.input.clone(),
1488            );
1489            last_message
1490                .content
1491                .push(AgentMessageContent::ToolUse(tool_use.clone()));
1492        } else {
1493            event_stream.update_tool_call_fields(
1494                &tool_use.id,
1495                acp::ToolCallUpdateFields {
1496                    title: Some(title.into()),
1497                    kind: Some(kind),
1498                    raw_input: Some(tool_use.input.clone()),
1499                    ..Default::default()
1500                },
1501            );
1502        }
1503
1504        if !tool_use.is_input_complete {
1505            return None;
1506        }
1507
1508        let Some(tool) = tool else {
1509            let content = format!("No tool named {} exists", tool_use.name);
1510            return Some(Task::ready(LanguageModelToolResult {
1511                content: LanguageModelToolResultContent::Text(Arc::from(content)),
1512                tool_use_id: tool_use.id,
1513                tool_name: tool_use.name,
1514                is_error: true,
1515                output: None,
1516            }));
1517        };
1518
1519        let fs = self.project.read(cx).fs().clone();
1520        let tool_event_stream =
1521            ToolCallEventStream::new(tool_use.id.clone(), event_stream.clone(), Some(fs));
1522        tool_event_stream.update_fields(acp::ToolCallUpdateFields {
1523            status: Some(acp::ToolCallStatus::InProgress),
1524            ..Default::default()
1525        });
1526        let supports_images = self.model().is_some_and(|model| model.supports_images());
1527        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
1528        log::debug!("Running tool {}", tool_use.name);
1529        Some(cx.foreground_executor().spawn(async move {
1530            let tool_result = tool_result.await.and_then(|output| {
1531                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
1532                    && !supports_images
1533                {
1534                    return Err(anyhow!(
1535                        "Attempted to read an image, but this model doesn't support it.",
1536                    ));
1537                }
1538                Ok(output)
1539            });
1540
1541            match tool_result {
1542                Ok(output) => LanguageModelToolResult {
1543                    tool_use_id: tool_use.id,
1544                    tool_name: tool_use.name,
1545                    is_error: false,
1546                    content: output.llm_output,
1547                    output: Some(output.raw_output),
1548                },
1549                Err(error) => LanguageModelToolResult {
1550                    tool_use_id: tool_use.id,
1551                    tool_name: tool_use.name,
1552                    is_error: true,
1553                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
1554                    output: Some(error.to_string().into()),
1555                },
1556            }
1557        }))
1558    }
1559
1560    fn handle_tool_use_json_parse_error_event(
1561        &mut self,
1562        tool_use_id: LanguageModelToolUseId,
1563        tool_name: Arc<str>,
1564        raw_input: Arc<str>,
1565        json_parse_error: String,
1566    ) -> LanguageModelToolResult {
1567        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
1568        LanguageModelToolResult {
1569            tool_use_id,
1570            tool_name,
1571            is_error: true,
1572            content: LanguageModelToolResultContent::Text(tool_output.into()),
1573            output: Some(serde_json::Value::String(raw_input.to_string())),
1574        }
1575    }
1576
1577    fn update_model_request_usage(&self, amount: usize, limit: UsageLimit, cx: &mut Context<Self>) {
1578        self.project
1579            .read(cx)
1580            .user_store()
1581            .update(cx, |user_store, cx| {
1582                user_store.update_model_request_usage(
1583                    ModelRequestUsage(RequestUsage {
1584                        amount: amount as i32,
1585                        limit,
1586                    }),
1587                    cx,
1588                )
1589            });
1590    }
1591
1592    pub fn title(&self) -> SharedString {
1593        self.title.clone().unwrap_or("New Thread".into())
1594    }
1595
1596    pub fn is_generating_summary(&self) -> bool {
1597        self.pending_summary_generation.is_some()
1598    }
1599
1600    pub fn summary(&mut self, cx: &mut Context<Self>) -> Shared<Task<Option<SharedString>>> {
1601        if let Some(summary) = self.summary.as_ref() {
1602            return Task::ready(Some(summary.clone())).shared();
1603        }
1604        if let Some(task) = self.pending_summary_generation.clone() {
1605            return task;
1606        }
1607        let Some(model) = self.summarization_model.clone() else {
1608            log::error!("No summarization model available");
1609            return Task::ready(None).shared();
1610        };
1611        let mut request = LanguageModelRequest {
1612            intent: Some(CompletionIntent::ThreadContextSummarization),
1613            temperature: AgentSettings::temperature_for_model(&model, cx),
1614            ..Default::default()
1615        };
1616
1617        for message in &self.messages {
1618            request.messages.extend(message.to_request());
1619        }
1620
1621        request.messages.push(LanguageModelRequestMessage {
1622            role: Role::User,
1623            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
1624            cache: false,
1625        });
1626
1627        let task = cx
1628            .spawn(async move |this, cx| {
1629                let mut summary = String::new();
1630                let mut messages = model.stream_completion(request, cx).await.log_err()?;
1631                while let Some(event) = messages.next().await {
1632                    let event = event.log_err()?;
1633                    let text = match event {
1634                        LanguageModelCompletionEvent::Text(text) => text,
1635                        LanguageModelCompletionEvent::StatusUpdate(
1636                            CompletionRequestStatus::UsageUpdated { amount, limit },
1637                        ) => {
1638                            this.update(cx, |thread, cx| {
1639                                thread.update_model_request_usage(amount, limit, cx);
1640                            })
1641                            .ok()?;
1642                            continue;
1643                        }
1644                        _ => continue,
1645                    };
1646
1647                    let mut lines = text.lines();
1648                    summary.extend(lines.next());
1649                }
1650
1651                log::debug!("Setting summary: {}", summary);
1652                let summary = SharedString::from(summary);
1653
1654                this.update(cx, |this, cx| {
1655                    this.summary = Some(summary.clone());
1656                    this.pending_summary_generation = None;
1657                    cx.notify()
1658                })
1659                .ok()?;
1660
1661                Some(summary)
1662            })
1663            .shared();
1664        self.pending_summary_generation = Some(task.clone());
1665        task
1666    }
1667
1668    fn generate_title(&mut self, cx: &mut Context<Self>) {
1669        let Some(model) = self.summarization_model.clone() else {
1670            return;
1671        };
1672
1673        log::debug!(
1674            "Generating title with model: {:?}",
1675            self.summarization_model.as_ref().map(|model| model.name())
1676        );
1677        let mut request = LanguageModelRequest {
1678            intent: Some(CompletionIntent::ThreadSummarization),
1679            temperature: AgentSettings::temperature_for_model(&model, cx),
1680            ..Default::default()
1681        };
1682
1683        for message in &self.messages {
1684            request.messages.extend(message.to_request());
1685        }
1686
1687        request.messages.push(LanguageModelRequestMessage {
1688            role: Role::User,
1689            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
1690            cache: false,
1691        });
1692        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
1693            let mut title = String::new();
1694
1695            let generate = async {
1696                let mut messages = model.stream_completion(request, cx).await?;
1697                while let Some(event) = messages.next().await {
1698                    let event = event?;
1699                    let text = match event {
1700                        LanguageModelCompletionEvent::Text(text) => text,
1701                        LanguageModelCompletionEvent::StatusUpdate(
1702                            CompletionRequestStatus::UsageUpdated { amount, limit },
1703                        ) => {
1704                            this.update(cx, |thread, cx| {
1705                                thread.update_model_request_usage(amount, limit, cx);
1706                            })?;
1707                            continue;
1708                        }
1709                        _ => continue,
1710                    };
1711
1712                    let mut lines = text.lines();
1713                    title.extend(lines.next());
1714
1715                    // Stop if the LLM generated multiple lines.
1716                    if lines.next().is_some() {
1717                        break;
1718                    }
1719                }
1720                anyhow::Ok(())
1721            };
1722
1723            if generate.await.context("failed to generate title").is_ok() {
1724                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
1725            }
1726            _ = this.update(cx, |this, _| this.pending_title_generation = None);
1727        }));
1728    }
1729
1730    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1731        self.pending_title_generation = None;
1732        if Some(&title) != self.title.as_ref() {
1733            self.title = Some(title);
1734            cx.emit(TitleUpdated);
1735            cx.notify();
1736        }
1737    }
1738
1739    fn clear_summary(&mut self) {
1740        self.summary = None;
1741        self.pending_summary_generation = None;
1742    }
1743
1744    fn last_user_message(&self) -> Option<&UserMessage> {
1745        self.messages
1746            .iter()
1747            .rev()
1748            .find_map(|message| match message {
1749                Message::User(user_message) => Some(user_message),
1750                Message::Agent(_) => None,
1751                Message::Resume => None,
1752            })
1753    }
1754
1755    fn pending_message(&mut self) -> &mut AgentMessage {
1756        self.pending_message.get_or_insert_default()
1757    }
1758
1759    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
1760        let Some(mut message) = self.pending_message.take() else {
1761            return;
1762        };
1763
1764        if message.content.is_empty() {
1765            return;
1766        }
1767
1768        for content in &message.content {
1769            let AgentMessageContent::ToolUse(tool_use) = content else {
1770                continue;
1771            };
1772
1773            if !message.tool_results.contains_key(&tool_use.id) {
1774                message.tool_results.insert(
1775                    tool_use.id.clone(),
1776                    LanguageModelToolResult {
1777                        tool_use_id: tool_use.id.clone(),
1778                        tool_name: tool_use.name.clone(),
1779                        is_error: true,
1780                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
1781                        output: None,
1782                    },
1783                );
1784            }
1785        }
1786
1787        self.messages.push(Message::Agent(message));
1788        self.updated_at = Utc::now();
1789        self.clear_summary();
1790        cx.notify()
1791    }
1792
1793    pub(crate) fn build_completion_request(
1794        &self,
1795        completion_intent: CompletionIntent,
1796        cx: &App,
1797    ) -> Result<LanguageModelRequest> {
1798        let model = self.model().context("No language model configured")?;
1799        let tools = if let Some(turn) = self.running_turn.as_ref() {
1800            turn.tools
1801                .iter()
1802                .filter_map(|(tool_name, tool)| {
1803                    log::trace!("Including tool: {}", tool_name);
1804                    Some(LanguageModelRequestTool {
1805                        name: tool_name.to_string(),
1806                        description: tool.description().to_string(),
1807                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
1808                    })
1809                })
1810                .collect::<Vec<_>>()
1811        } else {
1812            Vec::new()
1813        };
1814
1815        log::debug!("Building completion request");
1816        log::debug!("Completion intent: {:?}", completion_intent);
1817        log::debug!("Completion mode: {:?}", self.completion_mode);
1818
1819        let available_tools: Vec<_> = self
1820            .running_turn
1821            .as_ref()
1822            .map(|turn| turn.tools.keys().cloned().collect())
1823            .unwrap_or_default();
1824
1825        log::debug!("Request includes {} tools", available_tools.len());
1826        let messages = self.build_request_messages(available_tools, cx);
1827        log::debug!("Request will include {} messages", messages.len());
1828
1829        let request = LanguageModelRequest {
1830            thread_id: Some(self.id.to_string()),
1831            prompt_id: Some(self.prompt_id.to_string()),
1832            intent: Some(completion_intent),
1833            mode: Some(self.completion_mode.into()),
1834            messages,
1835            tools,
1836            tool_choice: None,
1837            stop: Vec::new(),
1838            temperature: AgentSettings::temperature_for_model(model, cx),
1839            thinking_allowed: true,
1840        };
1841
1842        log::debug!("Completion request built successfully");
1843        Ok(request)
1844    }
1845
1846    fn enabled_tools(
1847        &self,
1848        profile: &AgentProfileSettings,
1849        model: &Arc<dyn LanguageModel>,
1850        cx: &App,
1851    ) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
1852        fn truncate(tool_name: &SharedString) -> SharedString {
1853            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
1854                let mut truncated = tool_name.to_string();
1855                truncated.truncate(MAX_TOOL_NAME_LENGTH);
1856                truncated.into()
1857            } else {
1858                tool_name.clone()
1859            }
1860        }
1861
1862        let mut tools = self
1863            .tools
1864            .iter()
1865            .filter_map(|(tool_name, tool)| {
1866                if tool.supports_provider(&model.provider_id())
1867                    && profile.is_tool_enabled(tool_name)
1868                {
1869                    Some((truncate(tool_name), tool.clone()))
1870                } else {
1871                    None
1872                }
1873            })
1874            .collect::<BTreeMap<_, _>>();
1875
1876        let mut context_server_tools = Vec::new();
1877        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
1878        let mut duplicate_tool_names = HashSet::default();
1879        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
1880            for (tool_name, tool) in server_tools {
1881                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
1882                    let tool_name = truncate(tool_name);
1883                    if !seen_tools.insert(tool_name.clone()) {
1884                        duplicate_tool_names.insert(tool_name.clone());
1885                    }
1886                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
1887                }
1888            }
1889        }
1890
1891        // When there are duplicate tool names, disambiguate by prefixing them
1892        // with the server ID. In the rare case there isn't enough space for the
1893        // disambiguated tool name, keep only the last tool with this name.
1894        for (server_id, tool_name, tool) in context_server_tools {
1895            if duplicate_tool_names.contains(&tool_name) {
1896                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
1897                if available >= 2 {
1898                    let mut disambiguated = server_id.0.to_string();
1899                    disambiguated.truncate(available - 1);
1900                    disambiguated.push('_');
1901                    disambiguated.push_str(&tool_name);
1902                    tools.insert(disambiguated.into(), tool.clone());
1903                } else {
1904                    tools.insert(tool_name, tool.clone());
1905                }
1906            } else {
1907                tools.insert(tool_name, tool.clone());
1908            }
1909        }
1910
1911        tools
1912    }
1913
1914    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
1915        self.running_turn.as_ref()?.tools.get(name).cloned()
1916    }
1917
1918    fn build_request_messages(
1919        &self,
1920        available_tools: Vec<SharedString>,
1921        cx: &App,
1922    ) -> Vec<LanguageModelRequestMessage> {
1923        log::trace!(
1924            "Building request messages from {} thread messages",
1925            self.messages.len()
1926        );
1927
1928        let system_prompt = SystemPromptTemplate {
1929            project: self.project_context.read(cx),
1930            available_tools,
1931        }
1932        .render(&self.templates)
1933        .context("failed to build system prompt")
1934        .expect("Invalid template");
1935        let mut messages = vec![LanguageModelRequestMessage {
1936            role: Role::System,
1937            content: vec![system_prompt.into()],
1938            cache: false,
1939        }];
1940        for message in &self.messages {
1941            messages.extend(message.to_request());
1942        }
1943
1944        if let Some(last_message) = messages.last_mut() {
1945            last_message.cache = true;
1946        }
1947
1948        if let Some(message) = self.pending_message.as_ref() {
1949            messages.extend(message.to_request());
1950        }
1951
1952        messages
1953    }
1954
1955    pub fn to_markdown(&self) -> String {
1956        let mut markdown = String::new();
1957        for (ix, message) in self.messages.iter().enumerate() {
1958            if ix > 0 {
1959                markdown.push('\n');
1960            }
1961            markdown.push_str(&message.to_markdown());
1962        }
1963
1964        if let Some(message) = self.pending_message.as_ref() {
1965            markdown.push('\n');
1966            markdown.push_str(&message.to_markdown());
1967        }
1968
1969        markdown
1970    }
1971
1972    fn advance_prompt_id(&mut self) {
1973        self.prompt_id = PromptId::new();
1974    }
1975
1976    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
1977        use LanguageModelCompletionError::*;
1978        use http_client::StatusCode;
1979
1980        // General strategy here:
1981        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
1982        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
1983        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
1984        match error {
1985            HttpResponseError {
1986                status_code: StatusCode::TOO_MANY_REQUESTS,
1987                ..
1988            } => Some(RetryStrategy::ExponentialBackoff {
1989                initial_delay: BASE_RETRY_DELAY,
1990                max_attempts: MAX_RETRY_ATTEMPTS,
1991            }),
1992            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
1993                Some(RetryStrategy::Fixed {
1994                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1995                    max_attempts: MAX_RETRY_ATTEMPTS,
1996                })
1997            }
1998            UpstreamProviderError {
1999                status,
2000                retry_after,
2001                ..
2002            } => match *status {
2003                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
2004                    Some(RetryStrategy::Fixed {
2005                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2006                        max_attempts: MAX_RETRY_ATTEMPTS,
2007                    })
2008                }
2009                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
2010                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2011                    // Internal Server Error could be anything, retry up to 3 times.
2012                    max_attempts: 3,
2013                }),
2014                status => {
2015                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
2016                    // but we frequently get them in practice. See https://http.dev/529
2017                    if status.as_u16() == 529 {
2018                        Some(RetryStrategy::Fixed {
2019                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2020                            max_attempts: MAX_RETRY_ATTEMPTS,
2021                        })
2022                    } else {
2023                        Some(RetryStrategy::Fixed {
2024                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2025                            max_attempts: 2,
2026                        })
2027                    }
2028                }
2029            },
2030            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
2031                delay: BASE_RETRY_DELAY,
2032                max_attempts: 3,
2033            }),
2034            ApiReadResponseError { .. }
2035            | HttpSend { .. }
2036            | DeserializeResponse { .. }
2037            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
2038                delay: BASE_RETRY_DELAY,
2039                max_attempts: 3,
2040            }),
2041            // Retrying these errors definitely shouldn't help.
2042            HttpResponseError {
2043                status_code:
2044                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
2045                ..
2046            }
2047            | AuthenticationError { .. }
2048            | PermissionError { .. }
2049            | NoApiKey { .. }
2050            | ApiEndpointNotFound { .. }
2051            | PromptTooLarge { .. } => None,
2052            // These errors might be transient, so retry them
2053            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
2054                delay: BASE_RETRY_DELAY,
2055                max_attempts: 1,
2056            }),
2057            // Retry all other 4xx and 5xx errors once.
2058            HttpResponseError { status_code, .. }
2059                if status_code.is_client_error() || status_code.is_server_error() =>
2060            {
2061                Some(RetryStrategy::Fixed {
2062                    delay: BASE_RETRY_DELAY,
2063                    max_attempts: 3,
2064                })
2065            }
2066            Other(err)
2067                if err.is::<language_model::PaymentRequiredError>()
2068                    || err.is::<language_model::ModelRequestLimitReachedError>() =>
2069            {
2070                // Retrying won't help for Payment Required or Model Request Limit errors (where
2071                // the user must upgrade to usage-based billing to get more requests, or else wait
2072                // for a significant amount of time for the request limit to reset).
2073                None
2074            }
2075            // Conservatively assume that any other errors are non-retryable
2076            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
2077                delay: BASE_RETRY_DELAY,
2078                max_attempts: 2,
2079            }),
2080        }
2081    }
2082}
2083
2084struct RunningTurn {
2085    /// Holds the task that handles agent interaction until the end of the turn.
2086    /// Survives across multiple requests as the model performs tool calls and
2087    /// we run tools, report their results.
2088    _task: Task<()>,
2089    /// The current event stream for the running turn. Used to report a final
2090    /// cancellation event if we cancel the turn.
2091    event_stream: ThreadEventStream,
2092    /// The tools that were enabled for this turn.
2093    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
2094}
2095
2096impl RunningTurn {
2097    fn cancel(self) {
2098        log::debug!("Cancelling in progress turn");
2099        self.event_stream.send_canceled();
2100    }
2101}
2102
2103pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
2104
2105impl EventEmitter<TokenUsageUpdated> for Thread {}
2106
2107pub struct TitleUpdated;
2108
2109impl EventEmitter<TitleUpdated> for Thread {}
2110
2111pub trait AgentTool
2112where
2113    Self: 'static + Sized,
2114{
2115    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
2116    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
2117
2118    fn name() -> &'static str;
2119
2120    fn description() -> SharedString {
2121        let schema = schemars::schema_for!(Self::Input);
2122        SharedString::new(
2123            schema
2124                .get("description")
2125                .and_then(|description| description.as_str())
2126                .unwrap_or_default(),
2127        )
2128    }
2129
2130    fn kind() -> acp::ToolKind;
2131
2132    /// The initial tool title to display. Can be updated during the tool run.
2133    fn initial_title(
2134        &self,
2135        input: Result<Self::Input, serde_json::Value>,
2136        cx: &mut App,
2137    ) -> SharedString;
2138
2139    /// Returns the JSON schema that describes the tool's input.
2140    fn input_schema(format: LanguageModelToolSchemaFormat) -> Schema {
2141        crate::tool_schema::root_schema_for::<Self::Input>(format)
2142    }
2143
2144    /// Some tools rely on a provider for the underlying billing or other reasons.
2145    /// Allow the tool to check if they are compatible, or should be filtered out.
2146    fn supports_provider(_provider: &LanguageModelProviderId) -> bool {
2147        true
2148    }
2149
2150    /// Runs the tool with the provided input.
2151    fn run(
2152        self: Arc<Self>,
2153        input: Self::Input,
2154        event_stream: ToolCallEventStream,
2155        cx: &mut App,
2156    ) -> Task<Result<Self::Output>>;
2157
2158    /// Emits events for a previous execution of the tool.
2159    fn replay(
2160        &self,
2161        _input: Self::Input,
2162        _output: Self::Output,
2163        _event_stream: ToolCallEventStream,
2164        _cx: &mut App,
2165    ) -> Result<()> {
2166        Ok(())
2167    }
2168
2169    fn erase(self) -> Arc<dyn AnyAgentTool> {
2170        Arc::new(Erased(Arc::new(self)))
2171    }
2172}
2173
2174pub struct Erased<T>(T);
2175
2176pub struct AgentToolOutput {
2177    pub llm_output: LanguageModelToolResultContent,
2178    pub raw_output: serde_json::Value,
2179}
2180
2181pub trait AnyAgentTool {
2182    fn name(&self) -> SharedString;
2183    fn description(&self) -> SharedString;
2184    fn kind(&self) -> acp::ToolKind;
2185    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
2186    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
2187    fn supports_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2188        true
2189    }
2190    fn run(
2191        self: Arc<Self>,
2192        input: serde_json::Value,
2193        event_stream: ToolCallEventStream,
2194        cx: &mut App,
2195    ) -> Task<Result<AgentToolOutput>>;
2196    fn replay(
2197        &self,
2198        input: serde_json::Value,
2199        output: serde_json::Value,
2200        event_stream: ToolCallEventStream,
2201        cx: &mut App,
2202    ) -> Result<()>;
2203}
2204
2205impl<T> AnyAgentTool for Erased<Arc<T>>
2206where
2207    T: AgentTool,
2208{
2209    fn name(&self) -> SharedString {
2210        T::name().into()
2211    }
2212
2213    fn description(&self) -> SharedString {
2214        T::description()
2215    }
2216
2217    fn kind(&self) -> agent_client_protocol::ToolKind {
2218        T::kind()
2219    }
2220
2221    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
2222        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
2223        self.0.initial_title(parsed_input, _cx)
2224    }
2225
2226    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
2227        let mut json = serde_json::to_value(T::input_schema(format))?;
2228        crate::tool_schema::adapt_schema_to_format(&mut json, format)?;
2229        Ok(json)
2230    }
2231
2232    fn supports_provider(&self, provider: &LanguageModelProviderId) -> bool {
2233        T::supports_provider(provider)
2234    }
2235
2236    fn run(
2237        self: Arc<Self>,
2238        input: serde_json::Value,
2239        event_stream: ToolCallEventStream,
2240        cx: &mut App,
2241    ) -> Task<Result<AgentToolOutput>> {
2242        cx.spawn(async move |cx| {
2243            let input = serde_json::from_value(input)?;
2244            let output = cx
2245                .update(|cx| self.0.clone().run(input, event_stream, cx))?
2246                .await?;
2247            let raw_output = serde_json::to_value(&output)?;
2248            Ok(AgentToolOutput {
2249                llm_output: output.into(),
2250                raw_output,
2251            })
2252        })
2253    }
2254
2255    fn replay(
2256        &self,
2257        input: serde_json::Value,
2258        output: serde_json::Value,
2259        event_stream: ToolCallEventStream,
2260        cx: &mut App,
2261    ) -> Result<()> {
2262        let input = serde_json::from_value(input)?;
2263        let output = serde_json::from_value(output)?;
2264        self.0.replay(input, output, event_stream, cx)
2265    }
2266}
2267
2268#[derive(Clone)]
2269struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
2270
2271impl ThreadEventStream {
2272    fn send_user_message(&self, message: &UserMessage) {
2273        self.0
2274            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
2275            .ok();
2276    }
2277
2278    fn send_text(&self, text: &str) {
2279        self.0
2280            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
2281            .ok();
2282    }
2283
2284    fn send_thinking(&self, text: &str) {
2285        self.0
2286            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
2287            .ok();
2288    }
2289
2290    fn send_tool_call(
2291        &self,
2292        id: &LanguageModelToolUseId,
2293        tool_name: &str,
2294        title: SharedString,
2295        kind: acp::ToolKind,
2296        input: serde_json::Value,
2297    ) {
2298        self.0
2299            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
2300                id,
2301                tool_name,
2302                title.to_string(),
2303                kind,
2304                input,
2305            ))))
2306            .ok();
2307    }
2308
2309    fn initial_tool_call(
2310        id: &LanguageModelToolUseId,
2311        tool_name: &str,
2312        title: String,
2313        kind: acp::ToolKind,
2314        input: serde_json::Value,
2315    ) -> acp::ToolCall {
2316        acp::ToolCall {
2317            meta: Some(serde_json::json!({
2318                "tool_name": tool_name
2319            })),
2320            id: acp::ToolCallId(id.to_string().into()),
2321            title,
2322            kind,
2323            status: acp::ToolCallStatus::Pending,
2324            content: vec![],
2325            locations: vec![],
2326            raw_input: Some(input),
2327            raw_output: None,
2328        }
2329    }
2330
2331    fn update_tool_call_fields(
2332        &self,
2333        tool_use_id: &LanguageModelToolUseId,
2334        fields: acp::ToolCallUpdateFields,
2335    ) {
2336        self.0
2337            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2338                acp::ToolCallUpdate {
2339                    meta: None,
2340                    id: acp::ToolCallId(tool_use_id.to_string().into()),
2341                    fields,
2342                }
2343                .into(),
2344            )))
2345            .ok();
2346    }
2347
2348    fn send_retry(&self, status: acp_thread::RetryStatus) {
2349        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
2350    }
2351
2352    fn send_stop(&self, reason: acp::StopReason) {
2353        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
2354    }
2355
2356    fn send_canceled(&self) {
2357        self.0
2358            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2359            .ok();
2360    }
2361
2362    fn send_error(&self, error: impl Into<anyhow::Error>) {
2363        self.0.unbounded_send(Err(error.into())).ok();
2364    }
2365}
2366
2367#[derive(Clone)]
2368pub struct ToolCallEventStream {
2369    tool_use_id: LanguageModelToolUseId,
2370    stream: ThreadEventStream,
2371    fs: Option<Arc<dyn Fs>>,
2372}
2373
2374impl ToolCallEventStream {
2375    #[cfg(any(test, feature = "test-support"))]
2376    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
2377        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
2378
2379        let stream = ToolCallEventStream::new("test_id".into(), ThreadEventStream(events_tx), None);
2380
2381        (stream, ToolCallEventStreamReceiver(events_rx))
2382    }
2383
2384    fn new(
2385        tool_use_id: LanguageModelToolUseId,
2386        stream: ThreadEventStream,
2387        fs: Option<Arc<dyn Fs>>,
2388    ) -> Self {
2389        Self {
2390            tool_use_id,
2391            stream,
2392            fs,
2393        }
2394    }
2395
2396    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
2397        self.stream
2398            .update_tool_call_fields(&self.tool_use_id, fields);
2399    }
2400
2401    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
2402        self.stream
2403            .0
2404            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2405                acp_thread::ToolCallUpdateDiff {
2406                    id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2407                    diff,
2408                }
2409                .into(),
2410            )))
2411            .ok();
2412    }
2413
2414    pub fn authorize(&self, title: impl Into<String>, cx: &mut App) -> Task<Result<()>> {
2415        if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions {
2416            return Task::ready(Ok(()));
2417        }
2418
2419        let (response_tx, response_rx) = oneshot::channel();
2420        self.stream
2421            .0
2422            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
2423                ToolCallAuthorization {
2424                    tool_call: acp::ToolCallUpdate {
2425                        meta: None,
2426                        id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2427                        fields: acp::ToolCallUpdateFields {
2428                            title: Some(title.into()),
2429                            ..Default::default()
2430                        },
2431                    },
2432                    options: vec![
2433                        acp::PermissionOption {
2434                            id: acp::PermissionOptionId("always_allow".into()),
2435                            name: "Always Allow".into(),
2436                            kind: acp::PermissionOptionKind::AllowAlways,
2437                            meta: None,
2438                        },
2439                        acp::PermissionOption {
2440                            id: acp::PermissionOptionId("allow".into()),
2441                            name: "Allow".into(),
2442                            kind: acp::PermissionOptionKind::AllowOnce,
2443                            meta: None,
2444                        },
2445                        acp::PermissionOption {
2446                            id: acp::PermissionOptionId("deny".into()),
2447                            name: "Deny".into(),
2448                            kind: acp::PermissionOptionKind::RejectOnce,
2449                            meta: None,
2450                        },
2451                    ],
2452                    response: response_tx,
2453                },
2454            )))
2455            .ok();
2456        let fs = self.fs.clone();
2457        cx.spawn(async move |cx| match response_rx.await?.0.as_ref() {
2458            "always_allow" => {
2459                if let Some(fs) = fs.clone() {
2460                    cx.update(|cx| {
2461                        update_settings_file(fs, cx, |settings, _| {
2462                            settings
2463                                .agent
2464                                .get_or_insert_default()
2465                                .set_always_allow_tool_actions(true);
2466                        });
2467                    })?;
2468                }
2469
2470                Ok(())
2471            }
2472            "allow" => Ok(()),
2473            _ => Err(anyhow!("Permission to run tool denied by user")),
2474        })
2475    }
2476}
2477
2478#[cfg(any(test, feature = "test-support"))]
2479pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
2480
2481#[cfg(any(test, feature = "test-support"))]
2482impl ToolCallEventStreamReceiver {
2483    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
2484        let event = self.0.next().await;
2485        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
2486            auth
2487        } else {
2488            panic!("Expected ToolCallAuthorization but got: {:?}", event);
2489        }
2490    }
2491
2492    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
2493        let event = self.0.next().await;
2494        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2495            update,
2496        )))) = event
2497        {
2498            update.fields
2499        } else {
2500            panic!("Expected update fields but got: {:?}", event);
2501        }
2502    }
2503
2504    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
2505        let event = self.0.next().await;
2506        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
2507            update,
2508        )))) = event
2509        {
2510            update.diff
2511        } else {
2512            panic!("Expected diff but got: {:?}", event);
2513        }
2514    }
2515
2516    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
2517        let event = self.0.next().await;
2518        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
2519            update,
2520        )))) = event
2521        {
2522            update.terminal
2523        } else {
2524            panic!("Expected terminal but got: {:?}", event);
2525        }
2526    }
2527}
2528
2529#[cfg(any(test, feature = "test-support"))]
2530impl std::ops::Deref for ToolCallEventStreamReceiver {
2531    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
2532
2533    fn deref(&self) -> &Self::Target {
2534        &self.0
2535    }
2536}
2537
2538#[cfg(any(test, feature = "test-support"))]
2539impl std::ops::DerefMut for ToolCallEventStreamReceiver {
2540    fn deref_mut(&mut self) -> &mut Self::Target {
2541        &mut self.0
2542    }
2543}
2544
2545impl From<&str> for UserMessageContent {
2546    fn from(text: &str) -> Self {
2547        Self::Text(text.into())
2548    }
2549}
2550
2551impl UserMessageContent {
2552    pub fn from_content_block(value: acp::ContentBlock, path_style: PathStyle) -> Self {
2553        match value {
2554            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
2555            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
2556            acp::ContentBlock::Audio(_) => {
2557                // TODO
2558                Self::Text("[audio]".to_string())
2559            }
2560            acp::ContentBlock::ResourceLink(resource_link) => {
2561                match MentionUri::parse(&resource_link.uri, path_style) {
2562                    Ok(uri) => Self::Mention {
2563                        uri,
2564                        content: String::new(),
2565                    },
2566                    Err(err) => {
2567                        log::error!("Failed to parse mention link: {}", err);
2568                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
2569                    }
2570                }
2571            }
2572            acp::ContentBlock::Resource(resource) => match resource.resource {
2573                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
2574                    match MentionUri::parse(&resource.uri, path_style) {
2575                        Ok(uri) => Self::Mention {
2576                            uri,
2577                            content: resource.text,
2578                        },
2579                        Err(err) => {
2580                            log::error!("Failed to parse mention link: {}", err);
2581                            Self::Text(
2582                                MarkdownCodeBlock {
2583                                    tag: &resource.uri,
2584                                    text: &resource.text,
2585                                }
2586                                .to_string(),
2587                            )
2588                        }
2589                    }
2590                }
2591                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
2592                    // TODO
2593                    Self::Text("[blob]".to_string())
2594                }
2595            },
2596        }
2597    }
2598}
2599
2600impl From<UserMessageContent> for acp::ContentBlock {
2601    fn from(content: UserMessageContent) -> Self {
2602        match content {
2603            UserMessageContent::Text(text) => acp::ContentBlock::Text(acp::TextContent {
2604                text,
2605                annotations: None,
2606                meta: None,
2607            }),
2608            UserMessageContent::Image(image) => acp::ContentBlock::Image(acp::ImageContent {
2609                data: image.source.to_string(),
2610                mime_type: "image/png".to_string(),
2611                meta: None,
2612                annotations: None,
2613                uri: None,
2614            }),
2615            UserMessageContent::Mention { uri, content } => {
2616                acp::ContentBlock::Resource(acp::EmbeddedResource {
2617                    meta: None,
2618                    resource: acp::EmbeddedResourceResource::TextResourceContents(
2619                        acp::TextResourceContents {
2620                            meta: None,
2621                            mime_type: None,
2622                            text: content,
2623                            uri: uri.to_uri().to_string(),
2624                        },
2625                    ),
2626                    annotations: None,
2627                })
2628            }
2629        }
2630    }
2631}
2632
2633fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
2634    LanguageModelImage {
2635        source: image_content.data.into(),
2636        // TODO: make this optional?
2637        size: gpui::Size::new(0.into(), 0.into()),
2638    }
2639}