thread.rs

   1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool, HeadTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ProjectSnapshot, ReadFileTool,
   5    SystemPromptTemplate, Template, Templates, TerminalTool, ThinkingTool, WebSearchTool,
   6};
   7use acp_thread::{MentionUri, UserMessageId};
   8use action_log::ActionLog;
   9
  10use agent_client_protocol as acp;
  11use agent_settings::{
  12    AgentProfileId, AgentProfileSettings, AgentSettings, CompletionMode,
  13    SUMMARIZE_THREAD_DETAILED_PROMPT, SUMMARIZE_THREAD_PROMPT,
  14};
  15use anyhow::{Context as _, Result, anyhow};
  16use chrono::{DateTime, Utc};
  17use client::{ModelRequestUsage, RequestUsage, UserStore};
  18use cloud_llm_client::{CompletionIntent, CompletionRequestStatus, Plan, UsageLimit};
  19use collections::{HashMap, HashSet, IndexMap};
  20use fs::Fs;
  21use futures::stream;
  22use futures::{
  23    FutureExt,
  24    channel::{mpsc, oneshot},
  25    future::Shared,
  26    stream::FuturesUnordered,
  27};
  28use gpui::{
  29    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  30};
  31use language_model::{
  32    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelExt,
  33    LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
  34    LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
  35    LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse,
  36    LanguageModelToolUseId, Role, SelectedModel, StopReason, TokenUsage, ZED_CLOUD_PROVIDER_ID,
  37};
  38use project::Project;
  39use prompt_store::ProjectContext;
  40use schemars::{JsonSchema, Schema};
  41use serde::{Deserialize, Serialize};
  42use settings::{Settings, update_settings_file};
  43use smol::stream::StreamExt;
  44use std::{
  45    collections::BTreeMap,
  46    ops::RangeInclusive,
  47    path::Path,
  48    rc::Rc,
  49    sync::Arc,
  50    time::{Duration, Instant},
  51};
  52use std::{fmt::Write, path::PathBuf};
  53use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock, paths::PathStyle};
  54use uuid::Uuid;
  55
  56const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  57pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  58
  59/// The ID of the user prompt that initiated a request.
  60///
  61/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  62#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  63pub struct PromptId(Arc<str>);
  64
  65impl PromptId {
  66    pub fn new() -> Self {
  67        Self(Uuid::new_v4().to_string().into())
  68    }
  69}
  70
  71impl std::fmt::Display for PromptId {
  72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  73        write!(f, "{}", self.0)
  74    }
  75}
  76
  77pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
  78pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
  79
  80#[derive(Debug, Clone)]
  81enum RetryStrategy {
  82    ExponentialBackoff {
  83        initial_delay: Duration,
  84        max_attempts: u8,
  85    },
  86    Fixed {
  87        delay: Duration,
  88        max_attempts: u8,
  89    },
  90}
  91
  92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  93pub enum Message {
  94    User(UserMessage),
  95    Agent(AgentMessage),
  96    Resume,
  97}
  98
  99impl Message {
 100    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 101        match self {
 102            Message::Agent(agent_message) => Some(agent_message),
 103            _ => None,
 104        }
 105    }
 106
 107    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 108        match self {
 109            Message::User(message) => vec![message.to_request()],
 110            Message::Agent(message) => message.to_request(),
 111            Message::Resume => vec![LanguageModelRequestMessage {
 112                role: Role::User,
 113                content: vec!["Continue where you left off".into()],
 114                cache: false,
 115            }],
 116        }
 117    }
 118
 119    pub fn to_markdown(&self) -> String {
 120        match self {
 121            Message::User(message) => message.to_markdown(),
 122            Message::Agent(message) => message.to_markdown(),
 123            Message::Resume => "[resume]\n".into(),
 124        }
 125    }
 126
 127    pub fn role(&self) -> Role {
 128        match self {
 129            Message::User(_) | Message::Resume => Role::User,
 130            Message::Agent(_) => Role::Assistant,
 131        }
 132    }
 133}
 134
 135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 136pub struct UserMessage {
 137    pub id: UserMessageId,
 138    pub content: Vec<UserMessageContent>,
 139}
 140
 141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 142pub enum UserMessageContent {
 143    Text(String),
 144    Mention { uri: MentionUri, content: String },
 145    Image(LanguageModelImage),
 146}
 147
 148impl UserMessage {
 149    pub fn to_markdown(&self) -> String {
 150        let mut markdown = String::from("## User\n\n");
 151
 152        for content in &self.content {
 153            match content {
 154                UserMessageContent::Text(text) => {
 155                    markdown.push_str(text);
 156                    markdown.push('\n');
 157                }
 158                UserMessageContent::Image(_) => {
 159                    markdown.push_str("<image />\n");
 160                }
 161                UserMessageContent::Mention { uri, content } => {
 162                    if !content.is_empty() {
 163                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 164                    } else {
 165                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 166                    }
 167                }
 168            }
 169        }
 170
 171        markdown
 172    }
 173
 174    fn to_request(&self) -> LanguageModelRequestMessage {
 175        let mut message = LanguageModelRequestMessage {
 176            role: Role::User,
 177            content: Vec::with_capacity(self.content.len()),
 178            cache: false,
 179        };
 180
 181        const OPEN_CONTEXT: &str = "<context>\n\
 182            The following items were attached by the user. \
 183            They are up-to-date and don't need to be re-read.\n\n";
 184
 185        const OPEN_FILES_TAG: &str = "<files>";
 186        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 187        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 188        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 189        const OPEN_THREADS_TAG: &str = "<threads>";
 190        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 191        const OPEN_RULES_TAG: &str =
 192            "<rules>\nThe user has specified the following rules that should be applied:\n";
 193
 194        let mut file_context = OPEN_FILES_TAG.to_string();
 195        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 196        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 197        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 198        let mut thread_context = OPEN_THREADS_TAG.to_string();
 199        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 200        let mut rules_context = OPEN_RULES_TAG.to_string();
 201
 202        for chunk in &self.content {
 203            let chunk = match chunk {
 204                UserMessageContent::Text(text) => {
 205                    language_model::MessageContent::Text(text.clone())
 206                }
 207                UserMessageContent::Image(value) => {
 208                    language_model::MessageContent::Image(value.clone())
 209                }
 210                UserMessageContent::Mention { uri, content } => {
 211                    match uri {
 212                        MentionUri::File { abs_path } => {
 213                            write!(
 214                                &mut file_context,
 215                                "\n{}",
 216                                MarkdownCodeBlock {
 217                                    tag: &codeblock_tag(abs_path, None),
 218                                    text: &content.to_string(),
 219                                }
 220                            )
 221                            .ok();
 222                        }
 223                        MentionUri::PastedImage => {
 224                            debug_panic!("pasted image URI should not be used in mention content")
 225                        }
 226                        MentionUri::Directory { .. } => {
 227                            write!(&mut directory_context, "\n{}\n", content).ok();
 228                        }
 229                        MentionUri::Symbol {
 230                            abs_path: path,
 231                            line_range,
 232                            ..
 233                        } => {
 234                            write!(
 235                                &mut symbol_context,
 236                                "\n{}",
 237                                MarkdownCodeBlock {
 238                                    tag: &codeblock_tag(path, Some(line_range)),
 239                                    text: content
 240                                }
 241                            )
 242                            .ok();
 243                        }
 244                        MentionUri::Selection {
 245                            abs_path: path,
 246                            line_range,
 247                            ..
 248                        } => {
 249                            write!(
 250                                &mut selection_context,
 251                                "\n{}",
 252                                MarkdownCodeBlock {
 253                                    tag: &codeblock_tag(
 254                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 255                                        Some(line_range)
 256                                    ),
 257                                    text: content
 258                                }
 259                            )
 260                            .ok();
 261                        }
 262                        MentionUri::Thread { .. } => {
 263                            write!(&mut thread_context, "\n{}\n", content).ok();
 264                        }
 265                        MentionUri::TextThread { .. } => {
 266                            write!(&mut thread_context, "\n{}\n", content).ok();
 267                        }
 268                        MentionUri::Rule { .. } => {
 269                            write!(
 270                                &mut rules_context,
 271                                "\n{}",
 272                                MarkdownCodeBlock {
 273                                    tag: "",
 274                                    text: content
 275                                }
 276                            )
 277                            .ok();
 278                        }
 279                        MentionUri::Fetch { url } => {
 280                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 281                        }
 282                    }
 283
 284                    language_model::MessageContent::Text(uri.as_link().to_string())
 285                }
 286            };
 287
 288            message.content.push(chunk);
 289        }
 290
 291        let len_before_context = message.content.len();
 292
 293        if file_context.len() > OPEN_FILES_TAG.len() {
 294            file_context.push_str("</files>\n");
 295            message
 296                .content
 297                .push(language_model::MessageContent::Text(file_context));
 298        }
 299
 300        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 301            directory_context.push_str("</directories>\n");
 302            message
 303                .content
 304                .push(language_model::MessageContent::Text(directory_context));
 305        }
 306
 307        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 308            symbol_context.push_str("</symbols>\n");
 309            message
 310                .content
 311                .push(language_model::MessageContent::Text(symbol_context));
 312        }
 313
 314        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 315            selection_context.push_str("</selections>\n");
 316            message
 317                .content
 318                .push(language_model::MessageContent::Text(selection_context));
 319        }
 320
 321        if thread_context.len() > OPEN_THREADS_TAG.len() {
 322            thread_context.push_str("</threads>\n");
 323            message
 324                .content
 325                .push(language_model::MessageContent::Text(thread_context));
 326        }
 327
 328        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 329            fetch_context.push_str("</fetched_urls>\n");
 330            message
 331                .content
 332                .push(language_model::MessageContent::Text(fetch_context));
 333        }
 334
 335        if rules_context.len() > OPEN_RULES_TAG.len() {
 336            rules_context.push_str("</user_rules>\n");
 337            message
 338                .content
 339                .push(language_model::MessageContent::Text(rules_context));
 340        }
 341
 342        if message.content.len() > len_before_context {
 343            message.content.insert(
 344                len_before_context,
 345                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 346            );
 347            message
 348                .content
 349                .push(language_model::MessageContent::Text("</context>".into()));
 350        }
 351
 352        message
 353    }
 354}
 355
 356fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 357    let mut result = String::new();
 358
 359    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 360        let _ = write!(result, "{} ", extension);
 361    }
 362
 363    let _ = write!(result, "{}", full_path.display());
 364
 365    if let Some(range) = line_range {
 366        if range.start() == range.end() {
 367            let _ = write!(result, ":{}", range.start() + 1);
 368        } else {
 369            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 370        }
 371    }
 372
 373    result
 374}
 375
 376impl AgentMessage {
 377    pub fn to_markdown(&self) -> String {
 378        let mut markdown = String::from("## Assistant\n\n");
 379
 380        for content in &self.content {
 381            match content {
 382                AgentMessageContent::Text(text) => {
 383                    markdown.push_str(text);
 384                    markdown.push('\n');
 385                }
 386                AgentMessageContent::Thinking { text, .. } => {
 387                    markdown.push_str("<think>");
 388                    markdown.push_str(text);
 389                    markdown.push_str("</think>\n");
 390                }
 391                AgentMessageContent::RedactedThinking(_) => {
 392                    markdown.push_str("<redacted_thinking />\n")
 393                }
 394                AgentMessageContent::ToolUse(tool_use) => {
 395                    markdown.push_str(&format!(
 396                        "**Tool Use**: {} (ID: {})\n",
 397                        tool_use.name, tool_use.id
 398                    ));
 399                    markdown.push_str(&format!(
 400                        "{}\n",
 401                        MarkdownCodeBlock {
 402                            tag: "json",
 403                            text: &format!("{:#}", tool_use.input)
 404                        }
 405                    ));
 406                }
 407            }
 408        }
 409
 410        for tool_result in self.tool_results.values() {
 411            markdown.push_str(&format!(
 412                "**Tool Result**: {} (ID: {})\n\n",
 413                tool_result.tool_name, tool_result.tool_use_id
 414            ));
 415            if tool_result.is_error {
 416                markdown.push_str("**ERROR:**\n");
 417            }
 418
 419            match &tool_result.content {
 420                LanguageModelToolResultContent::Text(text) => {
 421                    writeln!(markdown, "{text}\n").ok();
 422                }
 423                LanguageModelToolResultContent::Image(_) => {
 424                    writeln!(markdown, "<image />\n").ok();
 425                }
 426            }
 427
 428            if let Some(output) = tool_result.output.as_ref() {
 429                writeln!(
 430                    markdown,
 431                    "**Debug Output**:\n\n```json\n{}\n```\n",
 432                    serde_json::to_string_pretty(output).unwrap()
 433                )
 434                .unwrap();
 435            }
 436        }
 437
 438        markdown
 439    }
 440
 441    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 442        let mut assistant_message = LanguageModelRequestMessage {
 443            role: Role::Assistant,
 444            content: Vec::with_capacity(self.content.len()),
 445            cache: false,
 446        };
 447        for chunk in &self.content {
 448            match chunk {
 449                AgentMessageContent::Text(text) => {
 450                    assistant_message
 451                        .content
 452                        .push(language_model::MessageContent::Text(text.clone()));
 453                }
 454                AgentMessageContent::Thinking { text, signature } => {
 455                    assistant_message
 456                        .content
 457                        .push(language_model::MessageContent::Thinking {
 458                            text: text.clone(),
 459                            signature: signature.clone(),
 460                        });
 461                }
 462                AgentMessageContent::RedactedThinking(value) => {
 463                    assistant_message.content.push(
 464                        language_model::MessageContent::RedactedThinking(value.clone()),
 465                    );
 466                }
 467                AgentMessageContent::ToolUse(tool_use) => {
 468                    if self.tool_results.contains_key(&tool_use.id) {
 469                        assistant_message
 470                            .content
 471                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 472                    }
 473                }
 474            };
 475        }
 476
 477        let mut user_message = LanguageModelRequestMessage {
 478            role: Role::User,
 479            content: Vec::new(),
 480            cache: false,
 481        };
 482
 483        for tool_result in self.tool_results.values() {
 484            let mut tool_result = tool_result.clone();
 485            // Surprisingly, the API fails if we return an empty string here.
 486            // It thinks we are sending a tool use without a tool result.
 487            if tool_result.content.is_empty() {
 488                tool_result.content = "<Tool returned an empty string>".into();
 489            }
 490            user_message
 491                .content
 492                .push(language_model::MessageContent::ToolResult(tool_result));
 493        }
 494
 495        let mut messages = Vec::new();
 496        if !assistant_message.content.is_empty() {
 497            messages.push(assistant_message);
 498        }
 499        if !user_message.content.is_empty() {
 500            messages.push(user_message);
 501        }
 502        messages
 503    }
 504}
 505
 506#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 507pub struct AgentMessage {
 508    pub content: Vec<AgentMessageContent>,
 509    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 510}
 511
 512#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 513pub enum AgentMessageContent {
 514    Text(String),
 515    Thinking {
 516        text: String,
 517        signature: Option<String>,
 518    },
 519    RedactedThinking(String),
 520    ToolUse(LanguageModelToolUse),
 521}
 522
 523pub trait TerminalHandle {
 524    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 525    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 526    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 527}
 528
 529pub trait ThreadEnvironment {
 530    fn create_terminal(
 531        &self,
 532        command: String,
 533        cwd: Option<PathBuf>,
 534        output_byte_limit: Option<u64>,
 535        cx: &mut AsyncApp,
 536    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 537}
 538
 539#[derive(Debug)]
 540pub enum ThreadEvent {
 541    UserMessage(UserMessage),
 542    AgentText(String),
 543    AgentThinking(String),
 544    ToolCall(acp::ToolCall),
 545    ToolCallUpdate(acp_thread::ToolCallUpdate),
 546    ToolCallAuthorization(ToolCallAuthorization),
 547    Retry(acp_thread::RetryStatus),
 548    Stop(acp::StopReason),
 549}
 550
 551#[derive(Debug)]
 552pub struct NewTerminal {
 553    pub command: String,
 554    pub output_byte_limit: Option<u64>,
 555    pub cwd: Option<PathBuf>,
 556    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 557}
 558
 559#[derive(Debug)]
 560pub struct ToolCallAuthorization {
 561    pub tool_call: acp::ToolCallUpdate,
 562    pub options: Vec<acp::PermissionOption>,
 563    pub response: oneshot::Sender<acp::PermissionOptionId>,
 564}
 565
 566#[derive(Debug, thiserror::Error)]
 567enum CompletionError {
 568    #[error("max tokens")]
 569    MaxTokens,
 570    #[error("refusal")]
 571    Refusal,
 572    #[error(transparent)]
 573    Other(#[from] anyhow::Error),
 574}
 575
 576pub struct Thread {
 577    id: acp::SessionId,
 578    prompt_id: PromptId,
 579    updated_at: DateTime<Utc>,
 580    title: Option<SharedString>,
 581    pending_title_generation: Option<Task<()>>,
 582    pending_summary_generation: Option<Shared<Task<Option<SharedString>>>>,
 583    summary: Option<SharedString>,
 584    messages: Vec<Message>,
 585    user_store: Entity<UserStore>,
 586    completion_mode: CompletionMode,
 587    /// Holds the task that handles agent interaction until the end of the turn.
 588    /// Survives across multiple requests as the model performs tool calls and
 589    /// we run tools, report their results.
 590    running_turn: Option<RunningTurn>,
 591    pending_message: Option<AgentMessage>,
 592    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 593    tool_use_limit_reached: bool,
 594    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 595    #[allow(unused)]
 596    cumulative_token_usage: TokenUsage,
 597    #[allow(unused)]
 598    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 599    context_server_registry: Entity<ContextServerRegistry>,
 600    profile_id: AgentProfileId,
 601    project_context: Entity<ProjectContext>,
 602    templates: Arc<Templates>,
 603    model: Option<Arc<dyn LanguageModel>>,
 604    summarization_model: Option<Arc<dyn LanguageModel>>,
 605    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 606    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 607    pub(crate) project: Entity<Project>,
 608    pub(crate) action_log: Entity<ActionLog>,
 609}
 610
 611impl Thread {
 612    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 613        let image = model.map_or(true, |model| model.supports_images());
 614        acp::PromptCapabilities {
 615            meta: None,
 616            image,
 617            audio: false,
 618            embedded_context: true,
 619        }
 620    }
 621
 622    pub fn new(
 623        project: Entity<Project>,
 624        project_context: Entity<ProjectContext>,
 625        context_server_registry: Entity<ContextServerRegistry>,
 626        templates: Arc<Templates>,
 627        model: Option<Arc<dyn LanguageModel>>,
 628        cx: &mut Context<Self>,
 629    ) -> Self {
 630        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 631        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 632        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 633            watch::channel(Self::prompt_capabilities(model.as_deref()));
 634        Self {
 635            id: acp::SessionId(uuid::Uuid::new_v4().to_string().into()),
 636            prompt_id: PromptId::new(),
 637            updated_at: Utc::now(),
 638            title: None,
 639            pending_title_generation: None,
 640            pending_summary_generation: None,
 641            summary: None,
 642            messages: Vec::new(),
 643            user_store: project.read(cx).user_store(),
 644            completion_mode: AgentSettings::get_global(cx).preferred_completion_mode,
 645            running_turn: None,
 646            pending_message: None,
 647            tools: BTreeMap::default(),
 648            tool_use_limit_reached: false,
 649            request_token_usage: HashMap::default(),
 650            cumulative_token_usage: TokenUsage::default(),
 651            initial_project_snapshot: {
 652                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 653                cx.foreground_executor()
 654                    .spawn(async move { Some(project_snapshot.await) })
 655                    .shared()
 656            },
 657            context_server_registry,
 658            profile_id,
 659            project_context,
 660            templates,
 661            model,
 662            summarization_model: None,
 663            prompt_capabilities_tx,
 664            prompt_capabilities_rx,
 665            project,
 666            action_log,
 667        }
 668    }
 669
 670    pub fn id(&self) -> &acp::SessionId {
 671        &self.id
 672    }
 673
 674    pub fn replay(
 675        &mut self,
 676        cx: &mut Context<Self>,
 677    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 678        let (tx, rx) = mpsc::unbounded();
 679        let stream = ThreadEventStream(tx);
 680        for message in &self.messages {
 681            match message {
 682                Message::User(user_message) => stream.send_user_message(user_message),
 683                Message::Agent(assistant_message) => {
 684                    for content in &assistant_message.content {
 685                        match content {
 686                            AgentMessageContent::Text(text) => stream.send_text(text),
 687                            AgentMessageContent::Thinking { text, .. } => {
 688                                stream.send_thinking(text)
 689                            }
 690                            AgentMessageContent::RedactedThinking(_) => {}
 691                            AgentMessageContent::ToolUse(tool_use) => {
 692                                self.replay_tool_call(
 693                                    tool_use,
 694                                    assistant_message.tool_results.get(&tool_use.id),
 695                                    &stream,
 696                                    cx,
 697                                );
 698                            }
 699                        }
 700                    }
 701                }
 702                Message::Resume => {}
 703            }
 704        }
 705        rx
 706    }
 707
 708    fn replay_tool_call(
 709        &self,
 710        tool_use: &LanguageModelToolUse,
 711        tool_result: Option<&LanguageModelToolResult>,
 712        stream: &ThreadEventStream,
 713        cx: &mut Context<Self>,
 714    ) {
 715        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
 716            self.context_server_registry
 717                .read(cx)
 718                .servers()
 719                .find_map(|(_, tools)| {
 720                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
 721                        Some(tool.clone())
 722                    } else {
 723                        None
 724                    }
 725                })
 726        });
 727
 728        let Some(tool) = tool else {
 729            stream
 730                .0
 731                .unbounded_send(Ok(ThreadEvent::ToolCall(acp::ToolCall {
 732                    meta: None,
 733                    id: acp::ToolCallId(tool_use.id.to_string().into()),
 734                    title: tool_use.name.to_string(),
 735                    kind: acp::ToolKind::Other,
 736                    status: acp::ToolCallStatus::Failed,
 737                    content: Vec::new(),
 738                    locations: Vec::new(),
 739                    raw_input: Some(tool_use.input.clone()),
 740                    raw_output: None,
 741                })))
 742                .ok();
 743            return;
 744        };
 745
 746        let title = tool.initial_title(tool_use.input.clone(), cx);
 747        let kind = tool.kind();
 748        stream.send_tool_call(
 749            &tool_use.id,
 750            &tool_use.name,
 751            title,
 752            kind,
 753            tool_use.input.clone(),
 754        );
 755
 756        let output = tool_result
 757            .as_ref()
 758            .and_then(|result| result.output.clone());
 759        if let Some(output) = output.clone() {
 760            let tool_event_stream = ToolCallEventStream::new(
 761                tool_use.id.clone(),
 762                stream.clone(),
 763                Some(self.project.read(cx).fs().clone()),
 764            );
 765            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
 766                .log_err();
 767        }
 768
 769        stream.update_tool_call_fields(
 770            &tool_use.id,
 771            acp::ToolCallUpdateFields {
 772                status: Some(
 773                    tool_result
 774                        .as_ref()
 775                        .map_or(acp::ToolCallStatus::Failed, |result| {
 776                            if result.is_error {
 777                                acp::ToolCallStatus::Failed
 778                            } else {
 779                                acp::ToolCallStatus::Completed
 780                            }
 781                        }),
 782                ),
 783                raw_output: output,
 784                ..Default::default()
 785            },
 786        );
 787    }
 788
 789    pub fn from_db(
 790        id: acp::SessionId,
 791        db_thread: DbThread,
 792        project: Entity<Project>,
 793        project_context: Entity<ProjectContext>,
 794        context_server_registry: Entity<ContextServerRegistry>,
 795        templates: Arc<Templates>,
 796        cx: &mut Context<Self>,
 797    ) -> Self {
 798        let profile_id = db_thread
 799            .profile
 800            .unwrap_or_else(|| AgentSettings::get_global(cx).default_profile.clone());
 801        let model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
 802            db_thread
 803                .model
 804                .and_then(|model| {
 805                    let model = SelectedModel {
 806                        provider: model.provider.clone().into(),
 807                        model: model.model.into(),
 808                    };
 809                    registry.select_model(&model, cx)
 810                })
 811                .or_else(|| registry.default_model())
 812                .map(|model| model.model)
 813        });
 814        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 815            watch::channel(Self::prompt_capabilities(model.as_deref()));
 816
 817        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 818
 819        Self {
 820            id,
 821            prompt_id: PromptId::new(),
 822            title: if db_thread.title.is_empty() {
 823                None
 824            } else {
 825                Some(db_thread.title.clone())
 826            },
 827            pending_title_generation: None,
 828            pending_summary_generation: None,
 829            summary: db_thread.detailed_summary,
 830            messages: db_thread.messages,
 831            user_store: project.read(cx).user_store(),
 832            completion_mode: db_thread.completion_mode.unwrap_or_default(),
 833            running_turn: None,
 834            pending_message: None,
 835            tools: BTreeMap::default(),
 836            tool_use_limit_reached: false,
 837            request_token_usage: db_thread.request_token_usage.clone(),
 838            cumulative_token_usage: db_thread.cumulative_token_usage,
 839            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
 840            context_server_registry,
 841            profile_id,
 842            project_context,
 843            templates,
 844            model,
 845            summarization_model: None,
 846            project,
 847            action_log,
 848            updated_at: db_thread.updated_at,
 849            prompt_capabilities_tx,
 850            prompt_capabilities_rx,
 851        }
 852    }
 853
 854    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
 855        let initial_project_snapshot = self.initial_project_snapshot.clone();
 856        let mut thread = DbThread {
 857            title: self.title(),
 858            messages: self.messages.clone(),
 859            updated_at: self.updated_at,
 860            detailed_summary: self.summary.clone(),
 861            initial_project_snapshot: None,
 862            cumulative_token_usage: self.cumulative_token_usage,
 863            request_token_usage: self.request_token_usage.clone(),
 864            model: self.model.as_ref().map(|model| DbLanguageModel {
 865                provider: model.provider_id().to_string(),
 866                model: model.name().0.to_string(),
 867            }),
 868            completion_mode: Some(self.completion_mode),
 869            profile: Some(self.profile_id.clone()),
 870        };
 871
 872        cx.background_spawn(async move {
 873            let initial_project_snapshot = initial_project_snapshot.await;
 874            thread.initial_project_snapshot = initial_project_snapshot;
 875            thread
 876        })
 877    }
 878
 879    /// Create a snapshot of the current project state including git information and unsaved buffers.
 880    fn project_snapshot(
 881        project: Entity<Project>,
 882        cx: &mut Context<Self>,
 883    ) -> Task<Arc<ProjectSnapshot>> {
 884        let task = project::telemetry_snapshot::TelemetrySnapshot::new(&project, cx);
 885        cx.spawn(async move |_, _| {
 886            let snapshot = task.await;
 887
 888            Arc::new(ProjectSnapshot {
 889                worktree_snapshots: snapshot.worktree_snapshots,
 890                timestamp: Utc::now(),
 891            })
 892        })
 893    }
 894
 895    pub fn project_context(&self) -> &Entity<ProjectContext> {
 896        &self.project_context
 897    }
 898
 899    pub fn project(&self) -> &Entity<Project> {
 900        &self.project
 901    }
 902
 903    pub fn action_log(&self) -> &Entity<ActionLog> {
 904        &self.action_log
 905    }
 906
 907    pub fn is_empty(&self) -> bool {
 908        self.messages.is_empty() && self.title.is_none()
 909    }
 910
 911    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
 912        self.model.as_ref()
 913    }
 914
 915    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
 916        let old_usage = self.latest_token_usage();
 917        self.model = Some(model);
 918        let new_caps = Self::prompt_capabilities(self.model.as_deref());
 919        let new_usage = self.latest_token_usage();
 920        if old_usage != new_usage {
 921            cx.emit(TokenUsageUpdated(new_usage));
 922        }
 923        self.prompt_capabilities_tx.send(new_caps).log_err();
 924        cx.notify()
 925    }
 926
 927    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
 928        self.summarization_model.as_ref()
 929    }
 930
 931    pub fn set_summarization_model(
 932        &mut self,
 933        model: Option<Arc<dyn LanguageModel>>,
 934        cx: &mut Context<Self>,
 935    ) {
 936        self.summarization_model = model;
 937        cx.notify()
 938    }
 939
 940    pub fn completion_mode(&self) -> CompletionMode {
 941        self.completion_mode
 942    }
 943
 944    pub fn set_completion_mode(&mut self, mode: CompletionMode, cx: &mut Context<Self>) {
 945        let old_usage = self.latest_token_usage();
 946        self.completion_mode = mode;
 947        let new_usage = self.latest_token_usage();
 948        if old_usage != new_usage {
 949            cx.emit(TokenUsageUpdated(new_usage));
 950        }
 951        cx.notify()
 952    }
 953
 954    #[cfg(any(test, feature = "test-support"))]
 955    pub fn last_message(&self) -> Option<Message> {
 956        if let Some(message) = self.pending_message.clone() {
 957            Some(Message::Agent(message))
 958        } else {
 959            self.messages.last().cloned()
 960        }
 961    }
 962
 963    pub fn add_default_tools(
 964        &mut self,
 965        environment: Rc<dyn ThreadEnvironment>,
 966        cx: &mut Context<Self>,
 967    ) {
 968        let language_registry = self.project.read(cx).languages().clone();
 969        self.add_tool(CopyPathTool::new(self.project.clone()));
 970        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
 971        self.add_tool(DeletePathTool::new(
 972            self.project.clone(),
 973            self.action_log.clone(),
 974        ));
 975        self.add_tool(DiagnosticsTool::new(self.project.clone()));
 976        self.add_tool(EditFileTool::new(
 977            self.project.clone(),
 978            cx.weak_entity(),
 979            language_registry,
 980            Templates::new(),
 981        ));
 982        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
 983        self.add_tool(FindPathTool::new(self.project.clone()));
 984        self.add_tool(GrepTool::new(self.project.clone()));
 985        self.add_tool(HeadTool::new(self.project.clone()));
 986        self.add_tool(ListDirectoryTool::new(self.project.clone()));
 987        self.add_tool(MovePathTool::new(self.project.clone()));
 988        self.add_tool(NowTool);
 989        self.add_tool(OpenTool::new(self.project.clone()));
 990        self.add_tool(ReadFileTool::new(
 991            self.project.clone(),
 992            self.action_log.clone(),
 993        ));
 994        self.add_tool(TerminalTool::new(self.project.clone(), environment));
 995        self.add_tool(ThinkingTool);
 996        self.add_tool(WebSearchTool);
 997    }
 998
 999    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
1000        self.tools.insert(T::name().into(), tool.erase());
1001    }
1002
1003    pub fn remove_tool(&mut self, name: &str) -> bool {
1004        self.tools.remove(name).is_some()
1005    }
1006
1007    pub fn profile(&self) -> &AgentProfileId {
1008        &self.profile_id
1009    }
1010
1011    pub fn set_profile(&mut self, profile_id: AgentProfileId) {
1012        self.profile_id = profile_id;
1013    }
1014
1015    pub fn cancel(&mut self, cx: &mut Context<Self>) {
1016        if let Some(running_turn) = self.running_turn.take() {
1017            running_turn.cancel();
1018        }
1019        self.flush_pending_message(cx);
1020    }
1021
1022    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1023        let Some(last_user_message) = self.last_user_message() else {
1024            return;
1025        };
1026
1027        self.request_token_usage
1028            .insert(last_user_message.id.clone(), update);
1029        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1030        cx.notify();
1031    }
1032
1033    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1034        self.cancel(cx);
1035        let Some(position) = self.messages.iter().position(
1036            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1037        ) else {
1038            return Err(anyhow!("Message not found"));
1039        };
1040
1041        for message in self.messages.drain(position..) {
1042            match message {
1043                Message::User(message) => {
1044                    self.request_token_usage.remove(&message.id);
1045                }
1046                Message::Agent(_) | Message::Resume => {}
1047            }
1048        }
1049        self.clear_summary();
1050        cx.notify();
1051        Ok(())
1052    }
1053
1054    pub fn latest_request_token_usage(&self) -> Option<language_model::TokenUsage> {
1055        let last_user_message = self.last_user_message()?;
1056        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1057        Some(*tokens)
1058    }
1059
1060    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1061        let usage = self.latest_request_token_usage()?;
1062        let model = self.model.clone()?;
1063        Some(acp_thread::TokenUsage {
1064            max_tokens: model.max_token_count_for_mode(self.completion_mode.into()),
1065            used_tokens: usage.total_tokens(),
1066        })
1067    }
1068
1069    pub fn resume(
1070        &mut self,
1071        cx: &mut Context<Self>,
1072    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1073        self.messages.push(Message::Resume);
1074        cx.notify();
1075
1076        log::debug!("Total messages in thread: {}", self.messages.len());
1077        self.run_turn(cx)
1078    }
1079
1080    /// Sending a message results in the model streaming a response, which could include tool calls.
1081    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1082    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1083    pub fn send<T>(
1084        &mut self,
1085        id: UserMessageId,
1086        content: impl IntoIterator<Item = T>,
1087        cx: &mut Context<Self>,
1088    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1089    where
1090        T: Into<UserMessageContent>,
1091    {
1092        let model = self.model().context("No language model configured")?;
1093
1094        log::info!("Thread::send called with model: {}", model.name().0);
1095        self.advance_prompt_id();
1096
1097        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1098        log::debug!("Thread::send content: {:?}", content);
1099
1100        self.messages
1101            .push(Message::User(UserMessage { id, content }));
1102        cx.notify();
1103
1104        log::debug!("Total messages in thread: {}", self.messages.len());
1105        self.run_turn(cx)
1106    }
1107
1108    #[cfg(feature = "eval")]
1109    pub fn proceed(
1110        &mut self,
1111        cx: &mut Context<Self>,
1112    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1113        self.run_turn(cx)
1114    }
1115
1116    fn run_turn(
1117        &mut self,
1118        cx: &mut Context<Self>,
1119    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1120        self.cancel(cx);
1121
1122        let model = self.model.clone().context("No language model configured")?;
1123        let profile = AgentSettings::get_global(cx)
1124            .profiles
1125            .get(&self.profile_id)
1126            .context("Profile not found")?;
1127        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1128        let event_stream = ThreadEventStream(events_tx);
1129        let message_ix = self.messages.len().saturating_sub(1);
1130        self.tool_use_limit_reached = false;
1131        self.clear_summary();
1132        self.running_turn = Some(RunningTurn {
1133            event_stream: event_stream.clone(),
1134            tools: self.enabled_tools(profile, &model, cx),
1135            _task: cx.spawn(async move |this, cx| {
1136                log::debug!("Starting agent turn execution");
1137
1138                let turn_result = Self::run_turn_internal(&this, model, &event_stream, cx).await;
1139                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1140
1141                match turn_result {
1142                    Ok(()) => {
1143                        log::debug!("Turn execution completed");
1144                        event_stream.send_stop(acp::StopReason::EndTurn);
1145                    }
1146                    Err(error) => {
1147                        log::error!("Turn execution failed: {:?}", error);
1148                        match error.downcast::<CompletionError>() {
1149                            Ok(CompletionError::Refusal) => {
1150                                event_stream.send_stop(acp::StopReason::Refusal);
1151                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1152                            }
1153                            Ok(CompletionError::MaxTokens) => {
1154                                event_stream.send_stop(acp::StopReason::MaxTokens);
1155                            }
1156                            Ok(CompletionError::Other(error)) | Err(error) => {
1157                                event_stream.send_error(error);
1158                            }
1159                        }
1160                    }
1161                }
1162
1163                _ = this.update(cx, |this, _| this.running_turn.take());
1164            }),
1165        });
1166        Ok(events_rx)
1167    }
1168
1169    async fn run_turn_internal(
1170        this: &WeakEntity<Self>,
1171        model: Arc<dyn LanguageModel>,
1172        event_stream: &ThreadEventStream,
1173        cx: &mut AsyncApp,
1174    ) -> Result<()> {
1175        let mut attempt = 0;
1176        let mut intent = CompletionIntent::UserPrompt;
1177        loop {
1178            let request =
1179                this.update(cx, |this, cx| this.build_completion_request(intent, cx))??;
1180
1181            telemetry::event!(
1182                "Agent Thread Completion",
1183                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1184                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1185                model = model.telemetry_id(),
1186                model_provider = model.provider_id().to_string(),
1187                attempt
1188            );
1189
1190            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1191
1192            let (mut events, mut error) = match model.stream_completion(request, cx).await {
1193                Ok(events) => (events, None),
1194                Err(err) => (stream::empty().boxed(), Some(err)),
1195            };
1196            let mut tool_results = FuturesUnordered::new();
1197            while let Some(event) = events.next().await {
1198                log::trace!("Received completion event: {:?}", event);
1199                match event {
1200                    Ok(event) => {
1201                        tool_results.extend(this.update(cx, |this, cx| {
1202                            this.handle_completion_event(event, event_stream, cx)
1203                        })??);
1204                    }
1205                    Err(err) => {
1206                        error = Some(err);
1207                        break;
1208                    }
1209                }
1210            }
1211
1212            let end_turn = tool_results.is_empty();
1213            while let Some(tool_result) = tool_results.next().await {
1214                log::debug!("Tool finished {:?}", tool_result);
1215
1216                event_stream.update_tool_call_fields(
1217                    &tool_result.tool_use_id,
1218                    acp::ToolCallUpdateFields {
1219                        status: Some(if tool_result.is_error {
1220                            acp::ToolCallStatus::Failed
1221                        } else {
1222                            acp::ToolCallStatus::Completed
1223                        }),
1224                        raw_output: tool_result.output.clone(),
1225                        ..Default::default()
1226                    },
1227                );
1228                this.update(cx, |this, _cx| {
1229                    this.pending_message()
1230                        .tool_results
1231                        .insert(tool_result.tool_use_id.clone(), tool_result);
1232                })?;
1233            }
1234
1235            this.update(cx, |this, cx| {
1236                this.flush_pending_message(cx);
1237                if this.title.is_none() && this.pending_title_generation.is_none() {
1238                    this.generate_title(cx);
1239                }
1240            })?;
1241
1242            if let Some(error) = error {
1243                attempt += 1;
1244                let retry = this.update(cx, |this, cx| {
1245                    let user_store = this.user_store.read(cx);
1246                    this.handle_completion_error(error, attempt, user_store.plan())
1247                })??;
1248                let timer = cx.background_executor().timer(retry.duration);
1249                event_stream.send_retry(retry);
1250                timer.await;
1251                this.update(cx, |this, _cx| {
1252                    if let Some(Message::Agent(message)) = this.messages.last() {
1253                        if message.tool_results.is_empty() {
1254                            intent = CompletionIntent::UserPrompt;
1255                            this.messages.push(Message::Resume);
1256                        }
1257                    }
1258                })?;
1259            } else if this.read_with(cx, |this, _| this.tool_use_limit_reached)? {
1260                return Err(language_model::ToolUseLimitReachedError.into());
1261            } else if end_turn {
1262                return Ok(());
1263            } else {
1264                intent = CompletionIntent::ToolResults;
1265                attempt = 0;
1266            }
1267        }
1268    }
1269
1270    fn handle_completion_error(
1271        &mut self,
1272        error: LanguageModelCompletionError,
1273        attempt: u8,
1274        plan: Option<Plan>,
1275    ) -> Result<acp_thread::RetryStatus> {
1276        let Some(model) = self.model.as_ref() else {
1277            return Err(anyhow!(error));
1278        };
1279
1280        let auto_retry = if model.provider_id() == ZED_CLOUD_PROVIDER_ID {
1281            match plan {
1282                Some(Plan::V2(_)) => true,
1283                Some(Plan::V1(_)) => self.completion_mode == CompletionMode::Burn,
1284                None => false,
1285            }
1286        } else {
1287            true
1288        };
1289
1290        if !auto_retry {
1291            return Err(anyhow!(error));
1292        }
1293
1294        let Some(strategy) = Self::retry_strategy_for(&error) else {
1295            return Err(anyhow!(error));
1296        };
1297
1298        let max_attempts = match &strategy {
1299            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1300            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1301        };
1302
1303        if attempt > max_attempts {
1304            return Err(anyhow!(error));
1305        }
1306
1307        let delay = match &strategy {
1308            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1309                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1310                Duration::from_secs(delay_secs)
1311            }
1312            RetryStrategy::Fixed { delay, .. } => *delay,
1313        };
1314        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1315
1316        Ok(acp_thread::RetryStatus {
1317            last_error: error.to_string().into(),
1318            attempt: attempt as usize,
1319            max_attempts: max_attempts as usize,
1320            started_at: Instant::now(),
1321            duration: delay,
1322        })
1323    }
1324
1325    /// A helper method that's called on every streamed completion event.
1326    /// Returns an optional tool result task, which the main agentic loop will
1327    /// send back to the model when it resolves.
1328    fn handle_completion_event(
1329        &mut self,
1330        event: LanguageModelCompletionEvent,
1331        event_stream: &ThreadEventStream,
1332        cx: &mut Context<Self>,
1333    ) -> Result<Option<Task<LanguageModelToolResult>>> {
1334        log::trace!("Handling streamed completion event: {:?}", event);
1335        use LanguageModelCompletionEvent::*;
1336
1337        match event {
1338            StartMessage { .. } => {
1339                self.flush_pending_message(cx);
1340                self.pending_message = Some(AgentMessage::default());
1341            }
1342            Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
1343            Thinking { text, signature } => {
1344                self.handle_thinking_event(text, signature, event_stream, cx)
1345            }
1346            RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
1347            ToolUse(tool_use) => {
1348                return Ok(self.handle_tool_use_event(tool_use, event_stream, cx));
1349            }
1350            ToolUseJsonParseError {
1351                id,
1352                tool_name,
1353                raw_input,
1354                json_parse_error,
1355            } => {
1356                return Ok(Some(Task::ready(
1357                    self.handle_tool_use_json_parse_error_event(
1358                        id,
1359                        tool_name,
1360                        raw_input,
1361                        json_parse_error,
1362                    ),
1363                )));
1364            }
1365            UsageUpdate(usage) => {
1366                telemetry::event!(
1367                    "Agent Thread Completion Usage Updated",
1368                    thread_id = self.id.to_string(),
1369                    prompt_id = self.prompt_id.to_string(),
1370                    model = self.model.as_ref().map(|m| m.telemetry_id()),
1371                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
1372                    input_tokens = usage.input_tokens,
1373                    output_tokens = usage.output_tokens,
1374                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
1375                    cache_read_input_tokens = usage.cache_read_input_tokens,
1376                );
1377                self.update_token_usage(usage, cx);
1378            }
1379            StatusUpdate(CompletionRequestStatus::UsageUpdated { amount, limit }) => {
1380                self.update_model_request_usage(amount, limit, cx);
1381            }
1382            StatusUpdate(
1383                CompletionRequestStatus::Started
1384                | CompletionRequestStatus::Queued { .. }
1385                | CompletionRequestStatus::Failed { .. },
1386            ) => {}
1387            StatusUpdate(CompletionRequestStatus::ToolUseLimitReached) => {
1388                self.tool_use_limit_reached = true;
1389            }
1390            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
1391            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
1392            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
1393        }
1394
1395        Ok(None)
1396    }
1397
1398    fn handle_text_event(
1399        &mut self,
1400        new_text: String,
1401        event_stream: &ThreadEventStream,
1402        cx: &mut Context<Self>,
1403    ) {
1404        event_stream.send_text(&new_text);
1405
1406        let last_message = self.pending_message();
1407        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1408            text.push_str(&new_text);
1409        } else {
1410            last_message
1411                .content
1412                .push(AgentMessageContent::Text(new_text));
1413        }
1414
1415        cx.notify();
1416    }
1417
1418    fn handle_thinking_event(
1419        &mut self,
1420        new_text: String,
1421        new_signature: Option<String>,
1422        event_stream: &ThreadEventStream,
1423        cx: &mut Context<Self>,
1424    ) {
1425        event_stream.send_thinking(&new_text);
1426
1427        let last_message = self.pending_message();
1428        if let Some(AgentMessageContent::Thinking { text, signature }) =
1429            last_message.content.last_mut()
1430        {
1431            text.push_str(&new_text);
1432            *signature = new_signature.or(signature.take());
1433        } else {
1434            last_message.content.push(AgentMessageContent::Thinking {
1435                text: new_text,
1436                signature: new_signature,
1437            });
1438        }
1439
1440        cx.notify();
1441    }
1442
1443    fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
1444        let last_message = self.pending_message();
1445        last_message
1446            .content
1447            .push(AgentMessageContent::RedactedThinking(data));
1448        cx.notify();
1449    }
1450
1451    fn handle_tool_use_event(
1452        &mut self,
1453        tool_use: LanguageModelToolUse,
1454        event_stream: &ThreadEventStream,
1455        cx: &mut Context<Self>,
1456    ) -> Option<Task<LanguageModelToolResult>> {
1457        cx.notify();
1458
1459        let tool = self.tool(tool_use.name.as_ref());
1460        let mut title = SharedString::from(&tool_use.name);
1461        let mut kind = acp::ToolKind::Other;
1462        if let Some(tool) = tool.as_ref() {
1463            title = tool.initial_title(tool_use.input.clone(), cx);
1464            kind = tool.kind();
1465        }
1466
1467        // Ensure the last message ends in the current tool use
1468        let last_message = self.pending_message();
1469        let push_new_tool_use = last_message.content.last_mut().is_none_or(|content| {
1470            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1471                if last_tool_use.id == tool_use.id {
1472                    *last_tool_use = tool_use.clone();
1473                    false
1474                } else {
1475                    true
1476                }
1477            } else {
1478                true
1479            }
1480        });
1481
1482        if push_new_tool_use {
1483            event_stream.send_tool_call(
1484                &tool_use.id,
1485                &tool_use.name,
1486                title,
1487                kind,
1488                tool_use.input.clone(),
1489            );
1490            last_message
1491                .content
1492                .push(AgentMessageContent::ToolUse(tool_use.clone()));
1493        } else {
1494            event_stream.update_tool_call_fields(
1495                &tool_use.id,
1496                acp::ToolCallUpdateFields {
1497                    title: Some(title.into()),
1498                    kind: Some(kind),
1499                    raw_input: Some(tool_use.input.clone()),
1500                    ..Default::default()
1501                },
1502            );
1503        }
1504
1505        if !tool_use.is_input_complete {
1506            return None;
1507        }
1508
1509        let Some(tool) = tool else {
1510            let content = format!("No tool named {} exists", tool_use.name);
1511            return Some(Task::ready(LanguageModelToolResult {
1512                content: LanguageModelToolResultContent::Text(Arc::from(content)),
1513                tool_use_id: tool_use.id,
1514                tool_name: tool_use.name,
1515                is_error: true,
1516                output: None,
1517            }));
1518        };
1519
1520        let fs = self.project.read(cx).fs().clone();
1521        let tool_event_stream =
1522            ToolCallEventStream::new(tool_use.id.clone(), event_stream.clone(), Some(fs));
1523        tool_event_stream.update_fields(acp::ToolCallUpdateFields {
1524            status: Some(acp::ToolCallStatus::InProgress),
1525            ..Default::default()
1526        });
1527        let supports_images = self.model().is_some_and(|model| model.supports_images());
1528        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
1529        log::debug!("Running tool {}", tool_use.name);
1530        Some(cx.foreground_executor().spawn(async move {
1531            let tool_result = tool_result.await.and_then(|output| {
1532                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
1533                    && !supports_images
1534                {
1535                    return Err(anyhow!(
1536                        "Attempted to read an image, but this model doesn't support it.",
1537                    ));
1538                }
1539                Ok(output)
1540            });
1541
1542            match tool_result {
1543                Ok(output) => LanguageModelToolResult {
1544                    tool_use_id: tool_use.id,
1545                    tool_name: tool_use.name,
1546                    is_error: false,
1547                    content: output.llm_output,
1548                    output: Some(output.raw_output),
1549                },
1550                Err(error) => LanguageModelToolResult {
1551                    tool_use_id: tool_use.id,
1552                    tool_name: tool_use.name,
1553                    is_error: true,
1554                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
1555                    output: Some(error.to_string().into()),
1556                },
1557            }
1558        }))
1559    }
1560
1561    fn handle_tool_use_json_parse_error_event(
1562        &mut self,
1563        tool_use_id: LanguageModelToolUseId,
1564        tool_name: Arc<str>,
1565        raw_input: Arc<str>,
1566        json_parse_error: String,
1567    ) -> LanguageModelToolResult {
1568        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
1569        LanguageModelToolResult {
1570            tool_use_id,
1571            tool_name,
1572            is_error: true,
1573            content: LanguageModelToolResultContent::Text(tool_output.into()),
1574            output: Some(serde_json::Value::String(raw_input.to_string())),
1575        }
1576    }
1577
1578    fn update_model_request_usage(&self, amount: usize, limit: UsageLimit, cx: &mut Context<Self>) {
1579        self.project
1580            .read(cx)
1581            .user_store()
1582            .update(cx, |user_store, cx| {
1583                user_store.update_model_request_usage(
1584                    ModelRequestUsage(RequestUsage {
1585                        amount: amount as i32,
1586                        limit,
1587                    }),
1588                    cx,
1589                )
1590            });
1591    }
1592
1593    pub fn title(&self) -> SharedString {
1594        self.title.clone().unwrap_or("New Thread".into())
1595    }
1596
1597    pub fn is_generating_summary(&self) -> bool {
1598        self.pending_summary_generation.is_some()
1599    }
1600
1601    pub fn summary(&mut self, cx: &mut Context<Self>) -> Shared<Task<Option<SharedString>>> {
1602        if let Some(summary) = self.summary.as_ref() {
1603            return Task::ready(Some(summary.clone())).shared();
1604        }
1605        if let Some(task) = self.pending_summary_generation.clone() {
1606            return task;
1607        }
1608        let Some(model) = self.summarization_model.clone() else {
1609            log::error!("No summarization model available");
1610            return Task::ready(None).shared();
1611        };
1612        let mut request = LanguageModelRequest {
1613            intent: Some(CompletionIntent::ThreadContextSummarization),
1614            temperature: AgentSettings::temperature_for_model(&model, cx),
1615            ..Default::default()
1616        };
1617
1618        for message in &self.messages {
1619            request.messages.extend(message.to_request());
1620        }
1621
1622        request.messages.push(LanguageModelRequestMessage {
1623            role: Role::User,
1624            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
1625            cache: false,
1626        });
1627
1628        let task = cx
1629            .spawn(async move |this, cx| {
1630                let mut summary = String::new();
1631                let mut messages = model.stream_completion(request, cx).await.log_err()?;
1632                while let Some(event) = messages.next().await {
1633                    let event = event.log_err()?;
1634                    let text = match event {
1635                        LanguageModelCompletionEvent::Text(text) => text,
1636                        LanguageModelCompletionEvent::StatusUpdate(
1637                            CompletionRequestStatus::UsageUpdated { amount, limit },
1638                        ) => {
1639                            this.update(cx, |thread, cx| {
1640                                thread.update_model_request_usage(amount, limit, cx);
1641                            })
1642                            .ok()?;
1643                            continue;
1644                        }
1645                        _ => continue,
1646                    };
1647
1648                    let mut lines = text.lines();
1649                    summary.extend(lines.next());
1650                }
1651
1652                log::debug!("Setting summary: {}", summary);
1653                let summary = SharedString::from(summary);
1654
1655                this.update(cx, |this, cx| {
1656                    this.summary = Some(summary.clone());
1657                    this.pending_summary_generation = None;
1658                    cx.notify()
1659                })
1660                .ok()?;
1661
1662                Some(summary)
1663            })
1664            .shared();
1665        self.pending_summary_generation = Some(task.clone());
1666        task
1667    }
1668
1669    fn generate_title(&mut self, cx: &mut Context<Self>) {
1670        let Some(model) = self.summarization_model.clone() else {
1671            return;
1672        };
1673
1674        log::debug!(
1675            "Generating title with model: {:?}",
1676            self.summarization_model.as_ref().map(|model| model.name())
1677        );
1678        let mut request = LanguageModelRequest {
1679            intent: Some(CompletionIntent::ThreadSummarization),
1680            temperature: AgentSettings::temperature_for_model(&model, cx),
1681            ..Default::default()
1682        };
1683
1684        for message in &self.messages {
1685            request.messages.extend(message.to_request());
1686        }
1687
1688        request.messages.push(LanguageModelRequestMessage {
1689            role: Role::User,
1690            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
1691            cache: false,
1692        });
1693        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
1694            let mut title = String::new();
1695
1696            let generate = async {
1697                let mut messages = model.stream_completion(request, cx).await?;
1698                while let Some(event) = messages.next().await {
1699                    let event = event?;
1700                    let text = match event {
1701                        LanguageModelCompletionEvent::Text(text) => text,
1702                        LanguageModelCompletionEvent::StatusUpdate(
1703                            CompletionRequestStatus::UsageUpdated { amount, limit },
1704                        ) => {
1705                            this.update(cx, |thread, cx| {
1706                                thread.update_model_request_usage(amount, limit, cx);
1707                            })?;
1708                            continue;
1709                        }
1710                        _ => continue,
1711                    };
1712
1713                    let mut lines = text.lines();
1714                    title.extend(lines.next());
1715
1716                    // Stop if the LLM generated multiple lines.
1717                    if lines.next().is_some() {
1718                        break;
1719                    }
1720                }
1721                anyhow::Ok(())
1722            };
1723
1724            if generate.await.context("failed to generate title").is_ok() {
1725                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
1726            }
1727            _ = this.update(cx, |this, _| this.pending_title_generation = None);
1728        }));
1729    }
1730
1731    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1732        self.pending_title_generation = None;
1733        if Some(&title) != self.title.as_ref() {
1734            self.title = Some(title);
1735            cx.emit(TitleUpdated);
1736            cx.notify();
1737        }
1738    }
1739
1740    fn clear_summary(&mut self) {
1741        self.summary = None;
1742        self.pending_summary_generation = None;
1743    }
1744
1745    fn last_user_message(&self) -> Option<&UserMessage> {
1746        self.messages
1747            .iter()
1748            .rev()
1749            .find_map(|message| match message {
1750                Message::User(user_message) => Some(user_message),
1751                Message::Agent(_) => None,
1752                Message::Resume => None,
1753            })
1754    }
1755
1756    fn pending_message(&mut self) -> &mut AgentMessage {
1757        self.pending_message.get_or_insert_default()
1758    }
1759
1760    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
1761        let Some(mut message) = self.pending_message.take() else {
1762            return;
1763        };
1764
1765        if message.content.is_empty() {
1766            return;
1767        }
1768
1769        for content in &message.content {
1770            let AgentMessageContent::ToolUse(tool_use) = content else {
1771                continue;
1772            };
1773
1774            if !message.tool_results.contains_key(&tool_use.id) {
1775                message.tool_results.insert(
1776                    tool_use.id.clone(),
1777                    LanguageModelToolResult {
1778                        tool_use_id: tool_use.id.clone(),
1779                        tool_name: tool_use.name.clone(),
1780                        is_error: true,
1781                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
1782                        output: None,
1783                    },
1784                );
1785            }
1786        }
1787
1788        self.messages.push(Message::Agent(message));
1789        self.updated_at = Utc::now();
1790        self.clear_summary();
1791        cx.notify()
1792    }
1793
1794    pub(crate) fn build_completion_request(
1795        &self,
1796        completion_intent: CompletionIntent,
1797        cx: &App,
1798    ) -> Result<LanguageModelRequest> {
1799        let model = self.model().context("No language model configured")?;
1800        let tools = if let Some(turn) = self.running_turn.as_ref() {
1801            turn.tools
1802                .iter()
1803                .filter_map(|(tool_name, tool)| {
1804                    log::trace!("Including tool: {}", tool_name);
1805                    Some(LanguageModelRequestTool {
1806                        name: tool_name.to_string(),
1807                        description: tool.description().to_string(),
1808                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
1809                    })
1810                })
1811                .collect::<Vec<_>>()
1812        } else {
1813            Vec::new()
1814        };
1815
1816        log::debug!("Building completion request");
1817        log::debug!("Completion intent: {:?}", completion_intent);
1818        log::debug!("Completion mode: {:?}", self.completion_mode);
1819
1820        let available_tools: Vec<_> = self
1821            .running_turn
1822            .as_ref()
1823            .map(|turn| turn.tools.keys().cloned().collect())
1824            .unwrap_or_default();
1825
1826        log::debug!("Request includes {} tools", available_tools.len());
1827        let messages = self.build_request_messages(available_tools, cx);
1828        log::debug!("Request will include {} messages", messages.len());
1829
1830        let request = LanguageModelRequest {
1831            thread_id: Some(self.id.to_string()),
1832            prompt_id: Some(self.prompt_id.to_string()),
1833            intent: Some(completion_intent),
1834            mode: Some(self.completion_mode.into()),
1835            messages,
1836            tools,
1837            tool_choice: None,
1838            stop: Vec::new(),
1839            temperature: AgentSettings::temperature_for_model(model, cx),
1840            thinking_allowed: true,
1841        };
1842
1843        log::debug!("Completion request built successfully");
1844        Ok(request)
1845    }
1846
1847    fn enabled_tools(
1848        &self,
1849        profile: &AgentProfileSettings,
1850        model: &Arc<dyn LanguageModel>,
1851        cx: &App,
1852    ) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
1853        fn truncate(tool_name: &SharedString) -> SharedString {
1854            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
1855                let mut truncated = tool_name.to_string();
1856                truncated.truncate(MAX_TOOL_NAME_LENGTH);
1857                truncated.into()
1858            } else {
1859                tool_name.clone()
1860            }
1861        }
1862
1863        let mut tools = self
1864            .tools
1865            .iter()
1866            .filter_map(|(tool_name, tool)| {
1867                if tool.supports_provider(&model.provider_id())
1868                    && profile.is_tool_enabled(tool_name)
1869                {
1870                    Some((truncate(tool_name), tool.clone()))
1871                } else {
1872                    None
1873                }
1874            })
1875            .collect::<BTreeMap<_, _>>();
1876
1877        let mut context_server_tools = Vec::new();
1878        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
1879        let mut duplicate_tool_names = HashSet::default();
1880        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
1881            for (tool_name, tool) in server_tools {
1882                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
1883                    let tool_name = truncate(tool_name);
1884                    if !seen_tools.insert(tool_name.clone()) {
1885                        duplicate_tool_names.insert(tool_name.clone());
1886                    }
1887                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
1888                }
1889            }
1890        }
1891
1892        // When there are duplicate tool names, disambiguate by prefixing them
1893        // with the server ID. In the rare case there isn't enough space for the
1894        // disambiguated tool name, keep only the last tool with this name.
1895        for (server_id, tool_name, tool) in context_server_tools {
1896            if duplicate_tool_names.contains(&tool_name) {
1897                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
1898                if available >= 2 {
1899                    let mut disambiguated = server_id.0.to_string();
1900                    disambiguated.truncate(available - 1);
1901                    disambiguated.push('_');
1902                    disambiguated.push_str(&tool_name);
1903                    tools.insert(disambiguated.into(), tool.clone());
1904                } else {
1905                    tools.insert(tool_name, tool.clone());
1906                }
1907            } else {
1908                tools.insert(tool_name, tool.clone());
1909            }
1910        }
1911
1912        tools
1913    }
1914
1915    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
1916        self.running_turn.as_ref()?.tools.get(name).cloned()
1917    }
1918
1919    fn build_request_messages(
1920        &self,
1921        available_tools: Vec<SharedString>,
1922        cx: &App,
1923    ) -> Vec<LanguageModelRequestMessage> {
1924        log::trace!(
1925            "Building request messages from {} thread messages",
1926            self.messages.len()
1927        );
1928
1929        let system_prompt = SystemPromptTemplate {
1930            project: self.project_context.read(cx),
1931            available_tools,
1932            model_name: self.model.as_ref().map(|m| m.name().0.to_string()),
1933        }
1934        .render(&self.templates)
1935        .context("failed to build system prompt")
1936        .expect("Invalid template");
1937        let mut messages = vec![LanguageModelRequestMessage {
1938            role: Role::System,
1939            content: vec![system_prompt.into()],
1940            cache: false,
1941        }];
1942        for message in &self.messages {
1943            messages.extend(message.to_request());
1944        }
1945
1946        if let Some(last_message) = messages.last_mut() {
1947            last_message.cache = true;
1948        }
1949
1950        if let Some(message) = self.pending_message.as_ref() {
1951            messages.extend(message.to_request());
1952        }
1953
1954        messages
1955    }
1956
1957    pub fn to_markdown(&self) -> String {
1958        let mut markdown = String::new();
1959        for (ix, message) in self.messages.iter().enumerate() {
1960            if ix > 0 {
1961                markdown.push('\n');
1962            }
1963            markdown.push_str(&message.to_markdown());
1964        }
1965
1966        if let Some(message) = self.pending_message.as_ref() {
1967            markdown.push('\n');
1968            markdown.push_str(&message.to_markdown());
1969        }
1970
1971        markdown
1972    }
1973
1974    fn advance_prompt_id(&mut self) {
1975        self.prompt_id = PromptId::new();
1976    }
1977
1978    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
1979        use LanguageModelCompletionError::*;
1980        use http_client::StatusCode;
1981
1982        // General strategy here:
1983        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
1984        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
1985        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
1986        match error {
1987            HttpResponseError {
1988                status_code: StatusCode::TOO_MANY_REQUESTS,
1989                ..
1990            } => Some(RetryStrategy::ExponentialBackoff {
1991                initial_delay: BASE_RETRY_DELAY,
1992                max_attempts: MAX_RETRY_ATTEMPTS,
1993            }),
1994            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
1995                Some(RetryStrategy::Fixed {
1996                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1997                    max_attempts: MAX_RETRY_ATTEMPTS,
1998                })
1999            }
2000            UpstreamProviderError {
2001                status,
2002                retry_after,
2003                ..
2004            } => match *status {
2005                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
2006                    Some(RetryStrategy::Fixed {
2007                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2008                        max_attempts: MAX_RETRY_ATTEMPTS,
2009                    })
2010                }
2011                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
2012                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2013                    // Internal Server Error could be anything, retry up to 3 times.
2014                    max_attempts: 3,
2015                }),
2016                status => {
2017                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
2018                    // but we frequently get them in practice. See https://http.dev/529
2019                    if status.as_u16() == 529 {
2020                        Some(RetryStrategy::Fixed {
2021                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2022                            max_attempts: MAX_RETRY_ATTEMPTS,
2023                        })
2024                    } else {
2025                        Some(RetryStrategy::Fixed {
2026                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2027                            max_attempts: 2,
2028                        })
2029                    }
2030                }
2031            },
2032            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
2033                delay: BASE_RETRY_DELAY,
2034                max_attempts: 3,
2035            }),
2036            ApiReadResponseError { .. }
2037            | HttpSend { .. }
2038            | DeserializeResponse { .. }
2039            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
2040                delay: BASE_RETRY_DELAY,
2041                max_attempts: 3,
2042            }),
2043            // Retrying these errors definitely shouldn't help.
2044            HttpResponseError {
2045                status_code:
2046                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
2047                ..
2048            }
2049            | AuthenticationError { .. }
2050            | PermissionError { .. }
2051            | NoApiKey { .. }
2052            | ApiEndpointNotFound { .. }
2053            | PromptTooLarge { .. } => None,
2054            // These errors might be transient, so retry them
2055            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
2056                delay: BASE_RETRY_DELAY,
2057                max_attempts: 1,
2058            }),
2059            // Retry all other 4xx and 5xx errors once.
2060            HttpResponseError { status_code, .. }
2061                if status_code.is_client_error() || status_code.is_server_error() =>
2062            {
2063                Some(RetryStrategy::Fixed {
2064                    delay: BASE_RETRY_DELAY,
2065                    max_attempts: 3,
2066                })
2067            }
2068            Other(err)
2069                if err.is::<language_model::PaymentRequiredError>()
2070                    || err.is::<language_model::ModelRequestLimitReachedError>() =>
2071            {
2072                // Retrying won't help for Payment Required or Model Request Limit errors (where
2073                // the user must upgrade to usage-based billing to get more requests, or else wait
2074                // for a significant amount of time for the request limit to reset).
2075                None
2076            }
2077            // Conservatively assume that any other errors are non-retryable
2078            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
2079                delay: BASE_RETRY_DELAY,
2080                max_attempts: 2,
2081            }),
2082        }
2083    }
2084}
2085
2086struct RunningTurn {
2087    /// Holds the task that handles agent interaction until the end of the turn.
2088    /// Survives across multiple requests as the model performs tool calls and
2089    /// we run tools, report their results.
2090    _task: Task<()>,
2091    /// The current event stream for the running turn. Used to report a final
2092    /// cancellation event if we cancel the turn.
2093    event_stream: ThreadEventStream,
2094    /// The tools that were enabled for this turn.
2095    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
2096}
2097
2098impl RunningTurn {
2099    fn cancel(self) {
2100        log::debug!("Cancelling in progress turn");
2101        self.event_stream.send_canceled();
2102    }
2103}
2104
2105pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
2106
2107impl EventEmitter<TokenUsageUpdated> for Thread {}
2108
2109pub struct TitleUpdated;
2110
2111impl EventEmitter<TitleUpdated> for Thread {}
2112
2113pub trait AgentTool
2114where
2115    Self: 'static + Sized,
2116{
2117    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
2118    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
2119
2120    fn name() -> &'static str;
2121
2122    fn description() -> SharedString {
2123        let schema = schemars::schema_for!(Self::Input);
2124        SharedString::new(
2125            schema
2126                .get("description")
2127                .and_then(|description| description.as_str())
2128                .unwrap_or_default(),
2129        )
2130    }
2131
2132    fn kind() -> acp::ToolKind;
2133
2134    /// The initial tool title to display. Can be updated during the tool run.
2135    fn initial_title(
2136        &self,
2137        input: Result<Self::Input, serde_json::Value>,
2138        cx: &mut App,
2139    ) -> SharedString;
2140
2141    /// Returns the JSON schema that describes the tool's input.
2142    fn input_schema(format: LanguageModelToolSchemaFormat) -> Schema {
2143        language_model::tool_schema::root_schema_for::<Self::Input>(format)
2144    }
2145
2146    /// Some tools rely on a provider for the underlying billing or other reasons.
2147    /// Allow the tool to check if they are compatible, or should be filtered out.
2148    fn supports_provider(_provider: &LanguageModelProviderId) -> bool {
2149        true
2150    }
2151
2152    /// Runs the tool with the provided input.
2153    fn run(
2154        self: Arc<Self>,
2155        input: Self::Input,
2156        event_stream: ToolCallEventStream,
2157        cx: &mut App,
2158    ) -> Task<Result<Self::Output>>;
2159
2160    /// Emits events for a previous execution of the tool.
2161    fn replay(
2162        &self,
2163        _input: Self::Input,
2164        _output: Self::Output,
2165        _event_stream: ToolCallEventStream,
2166        _cx: &mut App,
2167    ) -> Result<()> {
2168        Ok(())
2169    }
2170
2171    fn erase(self) -> Arc<dyn AnyAgentTool> {
2172        Arc::new(Erased(Arc::new(self)))
2173    }
2174}
2175
2176pub struct Erased<T>(T);
2177
2178pub struct AgentToolOutput {
2179    pub llm_output: LanguageModelToolResultContent,
2180    pub raw_output: serde_json::Value,
2181}
2182
2183pub trait AnyAgentTool {
2184    fn name(&self) -> SharedString;
2185    fn description(&self) -> SharedString;
2186    fn kind(&self) -> acp::ToolKind;
2187    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
2188    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
2189    fn supports_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2190        true
2191    }
2192    fn run(
2193        self: Arc<Self>,
2194        input: serde_json::Value,
2195        event_stream: ToolCallEventStream,
2196        cx: &mut App,
2197    ) -> Task<Result<AgentToolOutput>>;
2198    fn replay(
2199        &self,
2200        input: serde_json::Value,
2201        output: serde_json::Value,
2202        event_stream: ToolCallEventStream,
2203        cx: &mut App,
2204    ) -> Result<()>;
2205}
2206
2207impl<T> AnyAgentTool for Erased<Arc<T>>
2208where
2209    T: AgentTool,
2210{
2211    fn name(&self) -> SharedString {
2212        T::name().into()
2213    }
2214
2215    fn description(&self) -> SharedString {
2216        T::description()
2217    }
2218
2219    fn kind(&self) -> agent_client_protocol::ToolKind {
2220        T::kind()
2221    }
2222
2223    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
2224        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
2225        self.0.initial_title(parsed_input, _cx)
2226    }
2227
2228    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
2229        let mut json = serde_json::to_value(T::input_schema(format))?;
2230        language_model::tool_schema::adapt_schema_to_format(&mut json, format)?;
2231        Ok(json)
2232    }
2233
2234    fn supports_provider(&self, provider: &LanguageModelProviderId) -> bool {
2235        T::supports_provider(provider)
2236    }
2237
2238    fn run(
2239        self: Arc<Self>,
2240        input: serde_json::Value,
2241        event_stream: ToolCallEventStream,
2242        cx: &mut App,
2243    ) -> Task<Result<AgentToolOutput>> {
2244        cx.spawn(async move |cx| {
2245            let input = serde_json::from_value(input)?;
2246            let output = cx
2247                .update(|cx| self.0.clone().run(input, event_stream, cx))?
2248                .await?;
2249            let raw_output = serde_json::to_value(&output)?;
2250            Ok(AgentToolOutput {
2251                llm_output: output.into(),
2252                raw_output,
2253            })
2254        })
2255    }
2256
2257    fn replay(
2258        &self,
2259        input: serde_json::Value,
2260        output: serde_json::Value,
2261        event_stream: ToolCallEventStream,
2262        cx: &mut App,
2263    ) -> Result<()> {
2264        let input = serde_json::from_value(input)?;
2265        let output = serde_json::from_value(output)?;
2266        self.0.replay(input, output, event_stream, cx)
2267    }
2268}
2269
2270#[derive(Clone)]
2271struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
2272
2273impl ThreadEventStream {
2274    fn send_user_message(&self, message: &UserMessage) {
2275        self.0
2276            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
2277            .ok();
2278    }
2279
2280    fn send_text(&self, text: &str) {
2281        self.0
2282            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
2283            .ok();
2284    }
2285
2286    fn send_thinking(&self, text: &str) {
2287        self.0
2288            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
2289            .ok();
2290    }
2291
2292    fn send_tool_call(
2293        &self,
2294        id: &LanguageModelToolUseId,
2295        tool_name: &str,
2296        title: SharedString,
2297        kind: acp::ToolKind,
2298        input: serde_json::Value,
2299    ) {
2300        self.0
2301            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
2302                id,
2303                tool_name,
2304                title.to_string(),
2305                kind,
2306                input,
2307            ))))
2308            .ok();
2309    }
2310
2311    fn initial_tool_call(
2312        id: &LanguageModelToolUseId,
2313        tool_name: &str,
2314        title: String,
2315        kind: acp::ToolKind,
2316        input: serde_json::Value,
2317    ) -> acp::ToolCall {
2318        acp::ToolCall {
2319            meta: Some(serde_json::json!({
2320                "tool_name": tool_name
2321            })),
2322            id: acp::ToolCallId(id.to_string().into()),
2323            title,
2324            kind,
2325            status: acp::ToolCallStatus::Pending,
2326            content: vec![],
2327            locations: vec![],
2328            raw_input: Some(input),
2329            raw_output: None,
2330        }
2331    }
2332
2333    fn update_tool_call_fields(
2334        &self,
2335        tool_use_id: &LanguageModelToolUseId,
2336        fields: acp::ToolCallUpdateFields,
2337    ) {
2338        self.0
2339            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2340                acp::ToolCallUpdate {
2341                    meta: None,
2342                    id: acp::ToolCallId(tool_use_id.to_string().into()),
2343                    fields,
2344                }
2345                .into(),
2346            )))
2347            .ok();
2348    }
2349
2350    fn send_retry(&self, status: acp_thread::RetryStatus) {
2351        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
2352    }
2353
2354    fn send_stop(&self, reason: acp::StopReason) {
2355        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
2356    }
2357
2358    fn send_canceled(&self) {
2359        self.0
2360            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2361            .ok();
2362    }
2363
2364    fn send_error(&self, error: impl Into<anyhow::Error>) {
2365        self.0.unbounded_send(Err(error.into())).ok();
2366    }
2367}
2368
2369#[derive(Clone)]
2370pub struct ToolCallEventStream {
2371    tool_use_id: LanguageModelToolUseId,
2372    stream: ThreadEventStream,
2373    fs: Option<Arc<dyn Fs>>,
2374}
2375
2376impl ToolCallEventStream {
2377    #[cfg(any(test, feature = "test-support"))]
2378    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
2379        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
2380
2381        let stream = ToolCallEventStream::new("test_id".into(), ThreadEventStream(events_tx), None);
2382
2383        (stream, ToolCallEventStreamReceiver(events_rx))
2384    }
2385
2386    fn new(
2387        tool_use_id: LanguageModelToolUseId,
2388        stream: ThreadEventStream,
2389        fs: Option<Arc<dyn Fs>>,
2390    ) -> Self {
2391        Self {
2392            tool_use_id,
2393            stream,
2394            fs,
2395        }
2396    }
2397
2398    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
2399        self.stream
2400            .update_tool_call_fields(&self.tool_use_id, fields);
2401    }
2402
2403    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
2404        self.stream
2405            .0
2406            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2407                acp_thread::ToolCallUpdateDiff {
2408                    id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2409                    diff,
2410                }
2411                .into(),
2412            )))
2413            .ok();
2414    }
2415
2416    pub fn authorize(&self, title: impl Into<String>, cx: &mut App) -> Task<Result<()>> {
2417        if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions {
2418            return Task::ready(Ok(()));
2419        }
2420
2421        let (response_tx, response_rx) = oneshot::channel();
2422        self.stream
2423            .0
2424            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
2425                ToolCallAuthorization {
2426                    tool_call: acp::ToolCallUpdate {
2427                        meta: None,
2428                        id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2429                        fields: acp::ToolCallUpdateFields {
2430                            title: Some(title.into()),
2431                            ..Default::default()
2432                        },
2433                    },
2434                    options: vec![
2435                        acp::PermissionOption {
2436                            id: acp::PermissionOptionId("always_allow".into()),
2437                            name: "Always Allow".into(),
2438                            kind: acp::PermissionOptionKind::AllowAlways,
2439                            meta: None,
2440                        },
2441                        acp::PermissionOption {
2442                            id: acp::PermissionOptionId("allow".into()),
2443                            name: "Allow".into(),
2444                            kind: acp::PermissionOptionKind::AllowOnce,
2445                            meta: None,
2446                        },
2447                        acp::PermissionOption {
2448                            id: acp::PermissionOptionId("deny".into()),
2449                            name: "Deny".into(),
2450                            kind: acp::PermissionOptionKind::RejectOnce,
2451                            meta: None,
2452                        },
2453                    ],
2454                    response: response_tx,
2455                },
2456            )))
2457            .ok();
2458        let fs = self.fs.clone();
2459        cx.spawn(async move |cx| match response_rx.await?.0.as_ref() {
2460            "always_allow" => {
2461                if let Some(fs) = fs.clone() {
2462                    cx.update(|cx| {
2463                        update_settings_file(fs, cx, |settings, _| {
2464                            settings
2465                                .agent
2466                                .get_or_insert_default()
2467                                .set_always_allow_tool_actions(true);
2468                        });
2469                    })?;
2470                }
2471
2472                Ok(())
2473            }
2474            "allow" => Ok(()),
2475            _ => Err(anyhow!("Permission to run tool denied by user")),
2476        })
2477    }
2478}
2479
2480#[cfg(any(test, feature = "test-support"))]
2481pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
2482
2483#[cfg(any(test, feature = "test-support"))]
2484impl ToolCallEventStreamReceiver {
2485    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
2486        let event = self.0.next().await;
2487        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
2488            auth
2489        } else {
2490            panic!("Expected ToolCallAuthorization but got: {:?}", event);
2491        }
2492    }
2493
2494    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
2495        let event = self.0.next().await;
2496        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2497            update,
2498        )))) = event
2499        {
2500            update.fields
2501        } else {
2502            panic!("Expected update fields but got: {:?}", event);
2503        }
2504    }
2505
2506    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
2507        let event = self.0.next().await;
2508        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
2509            update,
2510        )))) = event
2511        {
2512            update.diff
2513        } else {
2514            panic!("Expected diff but got: {:?}", event);
2515        }
2516    }
2517
2518    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
2519        let event = self.0.next().await;
2520        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
2521            update,
2522        )))) = event
2523        {
2524            update.terminal
2525        } else {
2526            panic!("Expected terminal but got: {:?}", event);
2527        }
2528    }
2529}
2530
2531#[cfg(any(test, feature = "test-support"))]
2532impl std::ops::Deref for ToolCallEventStreamReceiver {
2533    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
2534
2535    fn deref(&self) -> &Self::Target {
2536        &self.0
2537    }
2538}
2539
2540#[cfg(any(test, feature = "test-support"))]
2541impl std::ops::DerefMut for ToolCallEventStreamReceiver {
2542    fn deref_mut(&mut self) -> &mut Self::Target {
2543        &mut self.0
2544    }
2545}
2546
2547impl From<&str> for UserMessageContent {
2548    fn from(text: &str) -> Self {
2549        Self::Text(text.into())
2550    }
2551}
2552
2553impl UserMessageContent {
2554    pub fn from_content_block(value: acp::ContentBlock, path_style: PathStyle) -> Self {
2555        match value {
2556            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
2557            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
2558            acp::ContentBlock::Audio(_) => {
2559                // TODO
2560                Self::Text("[audio]".to_string())
2561            }
2562            acp::ContentBlock::ResourceLink(resource_link) => {
2563                match MentionUri::parse(&resource_link.uri, path_style) {
2564                    Ok(uri) => Self::Mention {
2565                        uri,
2566                        content: String::new(),
2567                    },
2568                    Err(err) => {
2569                        log::error!("Failed to parse mention link: {}", err);
2570                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
2571                    }
2572                }
2573            }
2574            acp::ContentBlock::Resource(resource) => match resource.resource {
2575                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
2576                    match MentionUri::parse(&resource.uri, path_style) {
2577                        Ok(uri) => Self::Mention {
2578                            uri,
2579                            content: resource.text,
2580                        },
2581                        Err(err) => {
2582                            log::error!("Failed to parse mention link: {}", err);
2583                            Self::Text(
2584                                MarkdownCodeBlock {
2585                                    tag: &resource.uri,
2586                                    text: &resource.text,
2587                                }
2588                                .to_string(),
2589                            )
2590                        }
2591                    }
2592                }
2593                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
2594                    // TODO
2595                    Self::Text("[blob]".to_string())
2596                }
2597            },
2598        }
2599    }
2600}
2601
2602impl From<UserMessageContent> for acp::ContentBlock {
2603    fn from(content: UserMessageContent) -> Self {
2604        match content {
2605            UserMessageContent::Text(text) => acp::ContentBlock::Text(acp::TextContent {
2606                text,
2607                annotations: None,
2608                meta: None,
2609            }),
2610            UserMessageContent::Image(image) => acp::ContentBlock::Image(acp::ImageContent {
2611                data: image.source.to_string(),
2612                mime_type: "image/png".to_string(),
2613                meta: None,
2614                annotations: None,
2615                uri: None,
2616            }),
2617            UserMessageContent::Mention { uri, content } => {
2618                acp::ContentBlock::Resource(acp::EmbeddedResource {
2619                    meta: None,
2620                    resource: acp::EmbeddedResourceResource::TextResourceContents(
2621                        acp::TextResourceContents {
2622                            meta: None,
2623                            mime_type: None,
2624                            text: content,
2625                            uri: uri.to_uri().to_string(),
2626                        },
2627                    ),
2628                    annotations: None,
2629                })
2630            }
2631        }
2632    }
2633}
2634
2635fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
2636    LanguageModelImage {
2637        source: image_content.data.into(),
2638        // TODO: make this optional?
2639        size: gpui::Size::new(0.into(), 0.into()),
2640    }
2641}