thread.rs

   1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ReadFileTool, SystemPromptTemplate,
   5    Template, Templates, TerminalTool, ThinkingTool, WebSearchTool,
   6};
   7use acp_thread::{MentionUri, UserMessageId};
   8use action_log::ActionLog;
   9use agent::thread::{GitState, ProjectSnapshot, WorktreeSnapshot};
  10use agent_client_protocol as acp;
  11use agent_settings::{
  12    AgentProfileId, AgentProfileSettings, AgentSettings, CompletionMode,
  13    SUMMARIZE_THREAD_DETAILED_PROMPT, SUMMARIZE_THREAD_PROMPT,
  14};
  15use anyhow::{Context as _, Result, anyhow};
  16use assistant_tool::adapt_schema_to_format;
  17use chrono::{DateTime, Utc};
  18use client::{ModelRequestUsage, RequestUsage};
  19use cloud_llm_client::{CompletionIntent, CompletionRequestStatus, UsageLimit};
  20use collections::{HashMap, HashSet, IndexMap};
  21use fs::Fs;
  22use futures::{
  23    FutureExt,
  24    channel::{mpsc, oneshot},
  25    future::Shared,
  26    stream::FuturesUnordered,
  27};
  28use git::repository::DiffType;
  29use gpui::{
  30    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  31};
  32use language_model::{
  33    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelExt,
  34    LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
  35    LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
  36    LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse,
  37    LanguageModelToolUseId, Role, SelectedModel, StopReason, TokenUsage,
  38};
  39use project::{
  40    Project,
  41    git_store::{GitStore, RepositoryState},
  42};
  43use prompt_store::ProjectContext;
  44use schemars::{JsonSchema, Schema};
  45use serde::{Deserialize, Serialize};
  46use settings::{Settings, update_settings_file};
  47use smol::stream::StreamExt;
  48use std::{
  49    collections::BTreeMap,
  50    ops::RangeInclusive,
  51    path::Path,
  52    rc::Rc,
  53    sync::Arc,
  54    time::{Duration, Instant},
  55};
  56use std::{fmt::Write, path::PathBuf};
  57use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock};
  58use uuid::Uuid;
  59
  60const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  61pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  62
  63/// The ID of the user prompt that initiated a request.
  64///
  65/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  66#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  67pub struct PromptId(Arc<str>);
  68
  69impl PromptId {
  70    pub fn new() -> Self {
  71        Self(Uuid::new_v4().to_string().into())
  72    }
  73}
  74
  75impl std::fmt::Display for PromptId {
  76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  77        write!(f, "{}", self.0)
  78    }
  79}
  80
  81pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
  82pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
  83
  84#[derive(Debug, Clone)]
  85enum RetryStrategy {
  86    ExponentialBackoff {
  87        initial_delay: Duration,
  88        max_attempts: u8,
  89    },
  90    Fixed {
  91        delay: Duration,
  92        max_attempts: u8,
  93    },
  94}
  95
  96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  97pub enum Message {
  98    User(UserMessage),
  99    Agent(AgentMessage),
 100    Resume,
 101}
 102
 103impl Message {
 104    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 105        match self {
 106            Message::Agent(agent_message) => Some(agent_message),
 107            _ => None,
 108        }
 109    }
 110
 111    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 112        match self {
 113            Message::User(message) => vec![message.to_request()],
 114            Message::Agent(message) => message.to_request(),
 115            Message::Resume => vec![LanguageModelRequestMessage {
 116                role: Role::User,
 117                content: vec!["Continue where you left off".into()],
 118                cache: false,
 119            }],
 120        }
 121    }
 122
 123    pub fn to_markdown(&self) -> String {
 124        match self {
 125            Message::User(message) => message.to_markdown(),
 126            Message::Agent(message) => message.to_markdown(),
 127            Message::Resume => "[resume]\n".into(),
 128        }
 129    }
 130
 131    pub fn role(&self) -> Role {
 132        match self {
 133            Message::User(_) | Message::Resume => Role::User,
 134            Message::Agent(_) => Role::Assistant,
 135        }
 136    }
 137}
 138
 139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 140pub struct UserMessage {
 141    pub id: UserMessageId,
 142    pub content: Vec<UserMessageContent>,
 143}
 144
 145#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 146pub enum UserMessageContent {
 147    Text(String),
 148    Mention { uri: MentionUri, content: String },
 149    Image(LanguageModelImage),
 150}
 151
 152impl UserMessage {
 153    pub fn to_markdown(&self) -> String {
 154        let mut markdown = String::from("## User\n\n");
 155
 156        for content in &self.content {
 157            match content {
 158                UserMessageContent::Text(text) => {
 159                    markdown.push_str(text);
 160                    markdown.push('\n');
 161                }
 162                UserMessageContent::Image(_) => {
 163                    markdown.push_str("<image />\n");
 164                }
 165                UserMessageContent::Mention { uri, content } => {
 166                    if !content.is_empty() {
 167                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 168                    } else {
 169                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 170                    }
 171                }
 172            }
 173        }
 174
 175        markdown
 176    }
 177
 178    fn to_request(&self) -> LanguageModelRequestMessage {
 179        let mut message = LanguageModelRequestMessage {
 180            role: Role::User,
 181            content: Vec::with_capacity(self.content.len()),
 182            cache: false,
 183        };
 184
 185        const OPEN_CONTEXT: &str = "<context>\n\
 186            The following items were attached by the user. \
 187            They are up-to-date and don't need to be re-read.\n\n";
 188
 189        const OPEN_FILES_TAG: &str = "<files>";
 190        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 191        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 192        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 193        const OPEN_THREADS_TAG: &str = "<threads>";
 194        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 195        const OPEN_RULES_TAG: &str =
 196            "<rules>\nThe user has specified the following rules that should be applied:\n";
 197
 198        let mut file_context = OPEN_FILES_TAG.to_string();
 199        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 200        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 201        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 202        let mut thread_context = OPEN_THREADS_TAG.to_string();
 203        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 204        let mut rules_context = OPEN_RULES_TAG.to_string();
 205
 206        for chunk in &self.content {
 207            let chunk = match chunk {
 208                UserMessageContent::Text(text) => {
 209                    language_model::MessageContent::Text(text.clone())
 210                }
 211                UserMessageContent::Image(value) => {
 212                    language_model::MessageContent::Image(value.clone())
 213                }
 214                UserMessageContent::Mention { uri, content } => {
 215                    match uri {
 216                        MentionUri::File { abs_path } => {
 217                            write!(
 218                                &mut file_context,
 219                                "\n{}",
 220                                MarkdownCodeBlock {
 221                                    tag: &codeblock_tag(abs_path, None),
 222                                    text: &content.to_string(),
 223                                }
 224                            )
 225                            .ok();
 226                        }
 227                        MentionUri::PastedImage => {
 228                            debug_panic!("pasted image URI should not be used in mention content")
 229                        }
 230                        MentionUri::Directory { .. } => {
 231                            write!(&mut directory_context, "\n{}\n", content).ok();
 232                        }
 233                        MentionUri::Symbol {
 234                            abs_path: path,
 235                            line_range,
 236                            ..
 237                        } => {
 238                            write!(
 239                                &mut symbol_context,
 240                                "\n{}",
 241                                MarkdownCodeBlock {
 242                                    tag: &codeblock_tag(path, Some(line_range)),
 243                                    text: content
 244                                }
 245                            )
 246                            .ok();
 247                        }
 248                        MentionUri::Selection {
 249                            abs_path: path,
 250                            line_range,
 251                            ..
 252                        } => {
 253                            write!(
 254                                &mut selection_context,
 255                                "\n{}",
 256                                MarkdownCodeBlock {
 257                                    tag: &codeblock_tag(
 258                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 259                                        Some(line_range)
 260                                    ),
 261                                    text: content
 262                                }
 263                            )
 264                            .ok();
 265                        }
 266                        MentionUri::Thread { .. } => {
 267                            write!(&mut thread_context, "\n{}\n", content).ok();
 268                        }
 269                        MentionUri::TextThread { .. } => {
 270                            write!(&mut thread_context, "\n{}\n", content).ok();
 271                        }
 272                        MentionUri::Rule { .. } => {
 273                            write!(
 274                                &mut rules_context,
 275                                "\n{}",
 276                                MarkdownCodeBlock {
 277                                    tag: "",
 278                                    text: content
 279                                }
 280                            )
 281                            .ok();
 282                        }
 283                        MentionUri::Fetch { url } => {
 284                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 285                        }
 286                    }
 287
 288                    language_model::MessageContent::Text(uri.as_link().to_string())
 289                }
 290            };
 291
 292            message.content.push(chunk);
 293        }
 294
 295        let len_before_context = message.content.len();
 296
 297        if file_context.len() > OPEN_FILES_TAG.len() {
 298            file_context.push_str("</files>\n");
 299            message
 300                .content
 301                .push(language_model::MessageContent::Text(file_context));
 302        }
 303
 304        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 305            directory_context.push_str("</directories>\n");
 306            message
 307                .content
 308                .push(language_model::MessageContent::Text(directory_context));
 309        }
 310
 311        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 312            symbol_context.push_str("</symbols>\n");
 313            message
 314                .content
 315                .push(language_model::MessageContent::Text(symbol_context));
 316        }
 317
 318        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 319            selection_context.push_str("</selections>\n");
 320            message
 321                .content
 322                .push(language_model::MessageContent::Text(selection_context));
 323        }
 324
 325        if thread_context.len() > OPEN_THREADS_TAG.len() {
 326            thread_context.push_str("</threads>\n");
 327            message
 328                .content
 329                .push(language_model::MessageContent::Text(thread_context));
 330        }
 331
 332        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 333            fetch_context.push_str("</fetched_urls>\n");
 334            message
 335                .content
 336                .push(language_model::MessageContent::Text(fetch_context));
 337        }
 338
 339        if rules_context.len() > OPEN_RULES_TAG.len() {
 340            rules_context.push_str("</user_rules>\n");
 341            message
 342                .content
 343                .push(language_model::MessageContent::Text(rules_context));
 344        }
 345
 346        if message.content.len() > len_before_context {
 347            message.content.insert(
 348                len_before_context,
 349                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 350            );
 351            message
 352                .content
 353                .push(language_model::MessageContent::Text("</context>".into()));
 354        }
 355
 356        message
 357    }
 358}
 359
 360fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 361    let mut result = String::new();
 362
 363    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 364        let _ = write!(result, "{} ", extension);
 365    }
 366
 367    let _ = write!(result, "{}", full_path.display());
 368
 369    if let Some(range) = line_range {
 370        if range.start() == range.end() {
 371            let _ = write!(result, ":{}", range.start() + 1);
 372        } else {
 373            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 374        }
 375    }
 376
 377    result
 378}
 379
 380impl AgentMessage {
 381    pub fn to_markdown(&self) -> String {
 382        let mut markdown = String::from("## Assistant\n\n");
 383
 384        for content in &self.content {
 385            match content {
 386                AgentMessageContent::Text(text) => {
 387                    markdown.push_str(text);
 388                    markdown.push('\n');
 389                }
 390                AgentMessageContent::Thinking { text, .. } => {
 391                    markdown.push_str("<think>");
 392                    markdown.push_str(text);
 393                    markdown.push_str("</think>\n");
 394                }
 395                AgentMessageContent::RedactedThinking(_) => {
 396                    markdown.push_str("<redacted_thinking />\n")
 397                }
 398                AgentMessageContent::ToolUse(tool_use) => {
 399                    markdown.push_str(&format!(
 400                        "**Tool Use**: {} (ID: {})\n",
 401                        tool_use.name, tool_use.id
 402                    ));
 403                    markdown.push_str(&format!(
 404                        "{}\n",
 405                        MarkdownCodeBlock {
 406                            tag: "json",
 407                            text: &format!("{:#}", tool_use.input)
 408                        }
 409                    ));
 410                }
 411            }
 412        }
 413
 414        for tool_result in self.tool_results.values() {
 415            markdown.push_str(&format!(
 416                "**Tool Result**: {} (ID: {})\n\n",
 417                tool_result.tool_name, tool_result.tool_use_id
 418            ));
 419            if tool_result.is_error {
 420                markdown.push_str("**ERROR:**\n");
 421            }
 422
 423            match &tool_result.content {
 424                LanguageModelToolResultContent::Text(text) => {
 425                    writeln!(markdown, "{text}\n").ok();
 426                }
 427                LanguageModelToolResultContent::Image(_) => {
 428                    writeln!(markdown, "<image />\n").ok();
 429                }
 430            }
 431
 432            if let Some(output) = tool_result.output.as_ref() {
 433                writeln!(
 434                    markdown,
 435                    "**Debug Output**:\n\n```json\n{}\n```\n",
 436                    serde_json::to_string_pretty(output).unwrap()
 437                )
 438                .unwrap();
 439            }
 440        }
 441
 442        markdown
 443    }
 444
 445    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 446        let mut assistant_message = LanguageModelRequestMessage {
 447            role: Role::Assistant,
 448            content: Vec::with_capacity(self.content.len()),
 449            cache: false,
 450        };
 451        for chunk in &self.content {
 452            match chunk {
 453                AgentMessageContent::Text(text) => {
 454                    assistant_message
 455                        .content
 456                        .push(language_model::MessageContent::Text(text.clone()));
 457                }
 458                AgentMessageContent::Thinking { text, signature } => {
 459                    assistant_message
 460                        .content
 461                        .push(language_model::MessageContent::Thinking {
 462                            text: text.clone(),
 463                            signature: signature.clone(),
 464                        });
 465                }
 466                AgentMessageContent::RedactedThinking(value) => {
 467                    assistant_message.content.push(
 468                        language_model::MessageContent::RedactedThinking(value.clone()),
 469                    );
 470                }
 471                AgentMessageContent::ToolUse(tool_use) => {
 472                    if self.tool_results.contains_key(&tool_use.id) {
 473                        assistant_message
 474                            .content
 475                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 476                    }
 477                }
 478            };
 479        }
 480
 481        let mut user_message = LanguageModelRequestMessage {
 482            role: Role::User,
 483            content: Vec::new(),
 484            cache: false,
 485        };
 486
 487        for tool_result in self.tool_results.values() {
 488            let mut tool_result = tool_result.clone();
 489            // Surprisingly, the API fails if we return an empty string here.
 490            // It thinks we are sending a tool use without a tool result.
 491            if tool_result.content.is_empty() {
 492                tool_result.content = "<Tool returned an empty string>".into();
 493            }
 494            user_message
 495                .content
 496                .push(language_model::MessageContent::ToolResult(tool_result));
 497        }
 498
 499        let mut messages = Vec::new();
 500        if !assistant_message.content.is_empty() {
 501            messages.push(assistant_message);
 502        }
 503        if !user_message.content.is_empty() {
 504            messages.push(user_message);
 505        }
 506        messages
 507    }
 508}
 509
 510#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 511pub struct AgentMessage {
 512    pub content: Vec<AgentMessageContent>,
 513    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 514}
 515
 516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 517pub enum AgentMessageContent {
 518    Text(String),
 519    Thinking {
 520        text: String,
 521        signature: Option<String>,
 522    },
 523    RedactedThinking(String),
 524    ToolUse(LanguageModelToolUse),
 525}
 526
 527pub trait TerminalHandle {
 528    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 529    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 530    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 531}
 532
 533pub trait ThreadEnvironment {
 534    fn create_terminal(
 535        &self,
 536        command: String,
 537        cwd: Option<PathBuf>,
 538        output_byte_limit: Option<u64>,
 539        cx: &mut AsyncApp,
 540    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 541}
 542
 543#[derive(Debug)]
 544pub enum ThreadEvent {
 545    UserMessage(UserMessage),
 546    AgentText(String),
 547    AgentThinking(String),
 548    ToolCall(acp::ToolCall),
 549    ToolCallUpdate(acp_thread::ToolCallUpdate),
 550    ToolCallAuthorization(ToolCallAuthorization),
 551    Retry(acp_thread::RetryStatus),
 552    Stop(acp::StopReason),
 553}
 554
 555#[derive(Debug)]
 556pub struct NewTerminal {
 557    pub command: String,
 558    pub output_byte_limit: Option<u64>,
 559    pub cwd: Option<PathBuf>,
 560    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 561}
 562
 563#[derive(Debug)]
 564pub struct ToolCallAuthorization {
 565    pub tool_call: acp::ToolCallUpdate,
 566    pub options: Vec<acp::PermissionOption>,
 567    pub response: oneshot::Sender<acp::PermissionOptionId>,
 568}
 569
 570#[derive(Debug, thiserror::Error)]
 571enum CompletionError {
 572    #[error("max tokens")]
 573    MaxTokens,
 574    #[error("refusal")]
 575    Refusal,
 576    #[error(transparent)]
 577    Other(#[from] anyhow::Error),
 578}
 579
 580pub struct Thread {
 581    id: acp::SessionId,
 582    prompt_id: PromptId,
 583    updated_at: DateTime<Utc>,
 584    title: Option<SharedString>,
 585    pending_title_generation: Option<Task<()>>,
 586    summary: Option<SharedString>,
 587    messages: Vec<Message>,
 588    completion_mode: CompletionMode,
 589    /// Holds the task that handles agent interaction until the end of the turn.
 590    /// Survives across multiple requests as the model performs tool calls and
 591    /// we run tools, report their results.
 592    running_turn: Option<RunningTurn>,
 593    pending_message: Option<AgentMessage>,
 594    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 595    tool_use_limit_reached: bool,
 596    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 597    #[allow(unused)]
 598    cumulative_token_usage: TokenUsage,
 599    #[allow(unused)]
 600    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 601    context_server_registry: Entity<ContextServerRegistry>,
 602    profile_id: AgentProfileId,
 603    project_context: Entity<ProjectContext>,
 604    templates: Arc<Templates>,
 605    model: Option<Arc<dyn LanguageModel>>,
 606    summarization_model: Option<Arc<dyn LanguageModel>>,
 607    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 608    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 609    pub(crate) project: Entity<Project>,
 610    pub(crate) action_log: Entity<ActionLog>,
 611}
 612
 613impl Thread {
 614    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 615        let image = model.map_or(true, |model| model.supports_images());
 616        acp::PromptCapabilities {
 617            meta: None,
 618            image,
 619            audio: false,
 620            embedded_context: true,
 621        }
 622    }
 623
 624    pub fn new(
 625        project: Entity<Project>,
 626        project_context: Entity<ProjectContext>,
 627        context_server_registry: Entity<ContextServerRegistry>,
 628        templates: Arc<Templates>,
 629        model: Option<Arc<dyn LanguageModel>>,
 630        cx: &mut Context<Self>,
 631    ) -> Self {
 632        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 633        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 634        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 635            watch::channel(Self::prompt_capabilities(model.as_deref()));
 636        Self {
 637            id: acp::SessionId(uuid::Uuid::new_v4().to_string().into()),
 638            prompt_id: PromptId::new(),
 639            updated_at: Utc::now(),
 640            title: None,
 641            pending_title_generation: None,
 642            summary: None,
 643            messages: Vec::new(),
 644            completion_mode: AgentSettings::get_global(cx).preferred_completion_mode,
 645            running_turn: None,
 646            pending_message: None,
 647            tools: BTreeMap::default(),
 648            tool_use_limit_reached: false,
 649            request_token_usage: HashMap::default(),
 650            cumulative_token_usage: TokenUsage::default(),
 651            initial_project_snapshot: {
 652                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 653                cx.foreground_executor()
 654                    .spawn(async move { Some(project_snapshot.await) })
 655                    .shared()
 656            },
 657            context_server_registry,
 658            profile_id,
 659            project_context,
 660            templates,
 661            model,
 662            summarization_model: None,
 663            prompt_capabilities_tx,
 664            prompt_capabilities_rx,
 665            project,
 666            action_log,
 667        }
 668    }
 669
 670    pub fn id(&self) -> &acp::SessionId {
 671        &self.id
 672    }
 673
 674    pub fn replay(
 675        &mut self,
 676        cx: &mut Context<Self>,
 677    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 678        let (tx, rx) = mpsc::unbounded();
 679        let stream = ThreadEventStream(tx);
 680        for message in &self.messages {
 681            match message {
 682                Message::User(user_message) => stream.send_user_message(user_message),
 683                Message::Agent(assistant_message) => {
 684                    for content in &assistant_message.content {
 685                        match content {
 686                            AgentMessageContent::Text(text) => stream.send_text(text),
 687                            AgentMessageContent::Thinking { text, .. } => {
 688                                stream.send_thinking(text)
 689                            }
 690                            AgentMessageContent::RedactedThinking(_) => {}
 691                            AgentMessageContent::ToolUse(tool_use) => {
 692                                self.replay_tool_call(
 693                                    tool_use,
 694                                    assistant_message.tool_results.get(&tool_use.id),
 695                                    &stream,
 696                                    cx,
 697                                );
 698                            }
 699                        }
 700                    }
 701                }
 702                Message::Resume => {}
 703            }
 704        }
 705        rx
 706    }
 707
 708    fn replay_tool_call(
 709        &self,
 710        tool_use: &LanguageModelToolUse,
 711        tool_result: Option<&LanguageModelToolResult>,
 712        stream: &ThreadEventStream,
 713        cx: &mut Context<Self>,
 714    ) {
 715        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
 716            self.context_server_registry
 717                .read(cx)
 718                .servers()
 719                .find_map(|(_, tools)| {
 720                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
 721                        Some(tool.clone())
 722                    } else {
 723                        None
 724                    }
 725                })
 726        });
 727
 728        let Some(tool) = tool else {
 729            stream
 730                .0
 731                .unbounded_send(Ok(ThreadEvent::ToolCall(acp::ToolCall {
 732                    meta: None,
 733                    id: acp::ToolCallId(tool_use.id.to_string().into()),
 734                    title: tool_use.name.to_string(),
 735                    kind: acp::ToolKind::Other,
 736                    status: acp::ToolCallStatus::Failed,
 737                    content: Vec::new(),
 738                    locations: Vec::new(),
 739                    raw_input: Some(tool_use.input.clone()),
 740                    raw_output: None,
 741                })))
 742                .ok();
 743            return;
 744        };
 745
 746        let title = tool.initial_title(tool_use.input.clone(), cx);
 747        let kind = tool.kind();
 748        stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
 749
 750        let output = tool_result
 751            .as_ref()
 752            .and_then(|result| result.output.clone());
 753        if let Some(output) = output.clone() {
 754            let tool_event_stream = ToolCallEventStream::new(
 755                tool_use.id.clone(),
 756                stream.clone(),
 757                Some(self.project.read(cx).fs().clone()),
 758            );
 759            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
 760                .log_err();
 761        }
 762
 763        stream.update_tool_call_fields(
 764            &tool_use.id,
 765            acp::ToolCallUpdateFields {
 766                status: Some(
 767                    tool_result
 768                        .as_ref()
 769                        .map_or(acp::ToolCallStatus::Failed, |result| {
 770                            if result.is_error {
 771                                acp::ToolCallStatus::Failed
 772                            } else {
 773                                acp::ToolCallStatus::Completed
 774                            }
 775                        }),
 776                ),
 777                raw_output: output,
 778                ..Default::default()
 779            },
 780        );
 781    }
 782
 783    pub fn from_db(
 784        id: acp::SessionId,
 785        db_thread: DbThread,
 786        project: Entity<Project>,
 787        project_context: Entity<ProjectContext>,
 788        context_server_registry: Entity<ContextServerRegistry>,
 789        action_log: Entity<ActionLog>,
 790        templates: Arc<Templates>,
 791        cx: &mut Context<Self>,
 792    ) -> Self {
 793        let profile_id = db_thread
 794            .profile
 795            .unwrap_or_else(|| AgentSettings::get_global(cx).default_profile.clone());
 796        let model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
 797            db_thread
 798                .model
 799                .and_then(|model| {
 800                    let model = SelectedModel {
 801                        provider: model.provider.clone().into(),
 802                        model: model.model.into(),
 803                    };
 804                    registry.select_model(&model, cx)
 805                })
 806                .or_else(|| registry.default_model())
 807                .map(|model| model.model)
 808        });
 809        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 810            watch::channel(Self::prompt_capabilities(model.as_deref()));
 811
 812        Self {
 813            id,
 814            prompt_id: PromptId::new(),
 815            title: if db_thread.title.is_empty() {
 816                None
 817            } else {
 818                Some(db_thread.title.clone())
 819            },
 820            pending_title_generation: None,
 821            summary: db_thread.detailed_summary,
 822            messages: db_thread.messages,
 823            completion_mode: db_thread.completion_mode.unwrap_or_default(),
 824            running_turn: None,
 825            pending_message: None,
 826            tools: BTreeMap::default(),
 827            tool_use_limit_reached: false,
 828            request_token_usage: db_thread.request_token_usage.clone(),
 829            cumulative_token_usage: db_thread.cumulative_token_usage,
 830            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
 831            context_server_registry,
 832            profile_id,
 833            project_context,
 834            templates,
 835            model,
 836            summarization_model: None,
 837            project,
 838            action_log,
 839            updated_at: db_thread.updated_at,
 840            prompt_capabilities_tx,
 841            prompt_capabilities_rx,
 842        }
 843    }
 844
 845    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
 846        let initial_project_snapshot = self.initial_project_snapshot.clone();
 847        let mut thread = DbThread {
 848            title: self.title(),
 849            messages: self.messages.clone(),
 850            updated_at: self.updated_at,
 851            detailed_summary: self.summary.clone(),
 852            initial_project_snapshot: None,
 853            cumulative_token_usage: self.cumulative_token_usage,
 854            request_token_usage: self.request_token_usage.clone(),
 855            model: self.model.as_ref().map(|model| DbLanguageModel {
 856                provider: model.provider_id().to_string(),
 857                model: model.name().0.to_string(),
 858            }),
 859            completion_mode: Some(self.completion_mode),
 860            profile: Some(self.profile_id.clone()),
 861        };
 862
 863        cx.background_spawn(async move {
 864            let initial_project_snapshot = initial_project_snapshot.await;
 865            thread.initial_project_snapshot = initial_project_snapshot;
 866            thread
 867        })
 868    }
 869
 870    /// Create a snapshot of the current project state including git information and unsaved buffers.
 871    fn project_snapshot(
 872        project: Entity<Project>,
 873        cx: &mut Context<Self>,
 874    ) -> Task<Arc<agent::thread::ProjectSnapshot>> {
 875        let git_store = project.read(cx).git_store().clone();
 876        let worktree_snapshots: Vec<_> = project
 877            .read(cx)
 878            .visible_worktrees(cx)
 879            .map(|worktree| Self::worktree_snapshot(worktree, git_store.clone(), cx))
 880            .collect();
 881
 882        cx.spawn(async move |_, cx| {
 883            let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
 884
 885            let mut unsaved_buffers = Vec::new();
 886            cx.update(|app_cx| {
 887                let buffer_store = project.read(app_cx).buffer_store();
 888                for buffer_handle in buffer_store.read(app_cx).buffers() {
 889                    let buffer = buffer_handle.read(app_cx);
 890                    if buffer.is_dirty()
 891                        && let Some(file) = buffer.file()
 892                    {
 893                        let path = file.path().to_string_lossy().to_string();
 894                        unsaved_buffers.push(path);
 895                    }
 896                }
 897            })
 898            .ok();
 899
 900            Arc::new(ProjectSnapshot {
 901                worktree_snapshots,
 902                unsaved_buffer_paths: unsaved_buffers,
 903                timestamp: Utc::now(),
 904            })
 905        })
 906    }
 907
 908    fn worktree_snapshot(
 909        worktree: Entity<project::Worktree>,
 910        git_store: Entity<GitStore>,
 911        cx: &App,
 912    ) -> Task<agent::thread::WorktreeSnapshot> {
 913        cx.spawn(async move |cx| {
 914            // Get worktree path and snapshot
 915            let worktree_info = cx.update(|app_cx| {
 916                let worktree = worktree.read(app_cx);
 917                let path = worktree.abs_path().to_string_lossy().to_string();
 918                let snapshot = worktree.snapshot();
 919                (path, snapshot)
 920            });
 921
 922            let Ok((worktree_path, _snapshot)) = worktree_info else {
 923                return WorktreeSnapshot {
 924                    worktree_path: String::new(),
 925                    git_state: None,
 926                };
 927            };
 928
 929            let git_state = git_store
 930                .update(cx, |git_store, cx| {
 931                    git_store
 932                        .repositories()
 933                        .values()
 934                        .find(|repo| {
 935                            repo.read(cx)
 936                                .abs_path_to_repo_path(&worktree.read(cx).abs_path())
 937                                .is_some()
 938                        })
 939                        .cloned()
 940                })
 941                .ok()
 942                .flatten()
 943                .map(|repo| {
 944                    repo.update(cx, |repo, _| {
 945                        let current_branch =
 946                            repo.branch.as_ref().map(|branch| branch.name().to_owned());
 947                        repo.send_job(None, |state, _| async move {
 948                            let RepositoryState::Local { backend, .. } = state else {
 949                                return GitState {
 950                                    remote_url: None,
 951                                    head_sha: None,
 952                                    current_branch,
 953                                    diff: None,
 954                                };
 955                            };
 956
 957                            let remote_url = backend.remote_url("origin");
 958                            let head_sha = backend.head_sha().await;
 959                            let diff = backend.diff(DiffType::HeadToWorktree).await.ok();
 960
 961                            GitState {
 962                                remote_url,
 963                                head_sha,
 964                                current_branch,
 965                                diff,
 966                            }
 967                        })
 968                    })
 969                });
 970
 971            let git_state = match git_state {
 972                Some(git_state) => match git_state.ok() {
 973                    Some(git_state) => git_state.await.ok(),
 974                    None => None,
 975                },
 976                None => None,
 977            };
 978
 979            WorktreeSnapshot {
 980                worktree_path,
 981                git_state,
 982            }
 983        })
 984    }
 985
 986    pub fn project_context(&self) -> &Entity<ProjectContext> {
 987        &self.project_context
 988    }
 989
 990    pub fn project(&self) -> &Entity<Project> {
 991        &self.project
 992    }
 993
 994    pub fn action_log(&self) -> &Entity<ActionLog> {
 995        &self.action_log
 996    }
 997
 998    pub fn is_empty(&self) -> bool {
 999        self.messages.is_empty() && self.title.is_none()
1000    }
1001
1002    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
1003        self.model.as_ref()
1004    }
1005
1006    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
1007        let old_usage = self.latest_token_usage();
1008        self.model = Some(model);
1009        let new_caps = Self::prompt_capabilities(self.model.as_deref());
1010        let new_usage = self.latest_token_usage();
1011        if old_usage != new_usage {
1012            cx.emit(TokenUsageUpdated(new_usage));
1013        }
1014        self.prompt_capabilities_tx.send(new_caps).log_err();
1015        cx.notify()
1016    }
1017
1018    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
1019        self.summarization_model.as_ref()
1020    }
1021
1022    pub fn set_summarization_model(
1023        &mut self,
1024        model: Option<Arc<dyn LanguageModel>>,
1025        cx: &mut Context<Self>,
1026    ) {
1027        self.summarization_model = model;
1028        cx.notify()
1029    }
1030
1031    pub fn completion_mode(&self) -> CompletionMode {
1032        self.completion_mode
1033    }
1034
1035    pub fn set_completion_mode(&mut self, mode: CompletionMode, cx: &mut Context<Self>) {
1036        let old_usage = self.latest_token_usage();
1037        self.completion_mode = mode;
1038        let new_usage = self.latest_token_usage();
1039        if old_usage != new_usage {
1040            cx.emit(TokenUsageUpdated(new_usage));
1041        }
1042        cx.notify()
1043    }
1044
1045    #[cfg(any(test, feature = "test-support"))]
1046    pub fn last_message(&self) -> Option<Message> {
1047        if let Some(message) = self.pending_message.clone() {
1048            Some(Message::Agent(message))
1049        } else {
1050            self.messages.last().cloned()
1051        }
1052    }
1053
1054    pub fn add_default_tools(
1055        &mut self,
1056        environment: Rc<dyn ThreadEnvironment>,
1057        cx: &mut Context<Self>,
1058    ) {
1059        let language_registry = self.project.read(cx).languages().clone();
1060        self.add_tool(CopyPathTool::new(self.project.clone()));
1061        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
1062        self.add_tool(DeletePathTool::new(
1063            self.project.clone(),
1064            self.action_log.clone(),
1065        ));
1066        self.add_tool(DiagnosticsTool::new(self.project.clone()));
1067        self.add_tool(EditFileTool::new(
1068            self.project.clone(),
1069            cx.weak_entity(),
1070            language_registry,
1071        ));
1072        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
1073        self.add_tool(FindPathTool::new(self.project.clone()));
1074        self.add_tool(GrepTool::new(self.project.clone()));
1075        self.add_tool(ListDirectoryTool::new(self.project.clone()));
1076        self.add_tool(MovePathTool::new(self.project.clone()));
1077        self.add_tool(NowTool);
1078        self.add_tool(OpenTool::new(self.project.clone()));
1079        self.add_tool(ReadFileTool::new(
1080            self.project.clone(),
1081            self.action_log.clone(),
1082        ));
1083        self.add_tool(TerminalTool::new(self.project.clone(), environment));
1084        self.add_tool(ThinkingTool);
1085        self.add_tool(WebSearchTool);
1086    }
1087
1088    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
1089        self.tools.insert(T::name().into(), tool.erase());
1090    }
1091
1092    pub fn remove_tool(&mut self, name: &str) -> bool {
1093        self.tools.remove(name).is_some()
1094    }
1095
1096    pub fn profile(&self) -> &AgentProfileId {
1097        &self.profile_id
1098    }
1099
1100    pub fn set_profile(&mut self, profile_id: AgentProfileId) {
1101        self.profile_id = profile_id;
1102    }
1103
1104    pub fn cancel(&mut self, cx: &mut Context<Self>) {
1105        if let Some(running_turn) = self.running_turn.take() {
1106            running_turn.cancel();
1107        }
1108        self.flush_pending_message(cx);
1109    }
1110
1111    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1112        let Some(last_user_message) = self.last_user_message() else {
1113            return;
1114        };
1115
1116        self.request_token_usage
1117            .insert(last_user_message.id.clone(), update);
1118        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1119        cx.notify();
1120    }
1121
1122    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1123        self.cancel(cx);
1124        let Some(position) = self.messages.iter().position(
1125            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1126        ) else {
1127            return Err(anyhow!("Message not found"));
1128        };
1129
1130        for message in self.messages.drain(position..) {
1131            match message {
1132                Message::User(message) => {
1133                    self.request_token_usage.remove(&message.id);
1134                }
1135                Message::Agent(_) | Message::Resume => {}
1136            }
1137        }
1138        self.summary = None;
1139        cx.notify();
1140        Ok(())
1141    }
1142
1143    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1144        let last_user_message = self.last_user_message()?;
1145        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1146        let model = self.model.clone()?;
1147
1148        Some(acp_thread::TokenUsage {
1149            max_tokens: model.max_token_count_for_mode(self.completion_mode.into()),
1150            used_tokens: tokens.total_tokens(),
1151        })
1152    }
1153
1154    pub fn resume(
1155        &mut self,
1156        cx: &mut Context<Self>,
1157    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1158        self.messages.push(Message::Resume);
1159        cx.notify();
1160
1161        log::debug!("Total messages in thread: {}", self.messages.len());
1162        self.run_turn(cx)
1163    }
1164
1165    /// Sending a message results in the model streaming a response, which could include tool calls.
1166    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1167    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1168    pub fn send<T>(
1169        &mut self,
1170        id: UserMessageId,
1171        content: impl IntoIterator<Item = T>,
1172        cx: &mut Context<Self>,
1173    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1174    where
1175        T: Into<UserMessageContent>,
1176    {
1177        let model = self.model().context("No language model configured")?;
1178
1179        log::info!("Thread::send called with model: {}", model.name().0);
1180        self.advance_prompt_id();
1181
1182        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1183        log::debug!("Thread::send content: {:?}", content);
1184
1185        self.messages
1186            .push(Message::User(UserMessage { id, content }));
1187        cx.notify();
1188
1189        log::debug!("Total messages in thread: {}", self.messages.len());
1190        self.run_turn(cx)
1191    }
1192
1193    fn run_turn(
1194        &mut self,
1195        cx: &mut Context<Self>,
1196    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1197        self.cancel(cx);
1198
1199        let model = self.model.clone().context("No language model configured")?;
1200        let profile = AgentSettings::get_global(cx)
1201            .profiles
1202            .get(&self.profile_id)
1203            .context("Profile not found")?;
1204        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1205        let event_stream = ThreadEventStream(events_tx);
1206        let message_ix = self.messages.len().saturating_sub(1);
1207        self.tool_use_limit_reached = false;
1208        self.summary = None;
1209        self.running_turn = Some(RunningTurn {
1210            event_stream: event_stream.clone(),
1211            tools: self.enabled_tools(profile, &model, cx),
1212            _task: cx.spawn(async move |this, cx| {
1213                log::debug!("Starting agent turn execution");
1214
1215                let turn_result = Self::run_turn_internal(&this, model, &event_stream, cx).await;
1216                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1217
1218                match turn_result {
1219                    Ok(()) => {
1220                        log::debug!("Turn execution completed");
1221                        event_stream.send_stop(acp::StopReason::EndTurn);
1222                    }
1223                    Err(error) => {
1224                        log::error!("Turn execution failed: {:?}", error);
1225                        match error.downcast::<CompletionError>() {
1226                            Ok(CompletionError::Refusal) => {
1227                                event_stream.send_stop(acp::StopReason::Refusal);
1228                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1229                            }
1230                            Ok(CompletionError::MaxTokens) => {
1231                                event_stream.send_stop(acp::StopReason::MaxTokens);
1232                            }
1233                            Ok(CompletionError::Other(error)) | Err(error) => {
1234                                event_stream.send_error(error);
1235                            }
1236                        }
1237                    }
1238                }
1239
1240                _ = this.update(cx, |this, _| this.running_turn.take());
1241            }),
1242        });
1243        Ok(events_rx)
1244    }
1245
1246    async fn run_turn_internal(
1247        this: &WeakEntity<Self>,
1248        model: Arc<dyn LanguageModel>,
1249        event_stream: &ThreadEventStream,
1250        cx: &mut AsyncApp,
1251    ) -> Result<()> {
1252        let mut attempt = 0;
1253        let mut intent = CompletionIntent::UserPrompt;
1254        loop {
1255            let request =
1256                this.update(cx, |this, cx| this.build_completion_request(intent, cx))??;
1257
1258            telemetry::event!(
1259                "Agent Thread Completion",
1260                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1261                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1262                model = model.telemetry_id(),
1263                model_provider = model.provider_id().to_string(),
1264                attempt
1265            );
1266
1267            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1268            let mut events = model
1269                .stream_completion(request, cx)
1270                .await
1271                .map_err(|error| anyhow!(error))?;
1272            let mut tool_results = FuturesUnordered::new();
1273            let mut error = None;
1274            while let Some(event) = events.next().await {
1275                log::trace!("Received completion event: {:?}", event);
1276                match event {
1277                    Ok(event) => {
1278                        tool_results.extend(this.update(cx, |this, cx| {
1279                            this.handle_completion_event(event, event_stream, cx)
1280                        })??);
1281                    }
1282                    Err(err) => {
1283                        error = Some(err);
1284                        break;
1285                    }
1286                }
1287            }
1288
1289            let end_turn = tool_results.is_empty();
1290            while let Some(tool_result) = tool_results.next().await {
1291                log::debug!("Tool finished {:?}", tool_result);
1292
1293                event_stream.update_tool_call_fields(
1294                    &tool_result.tool_use_id,
1295                    acp::ToolCallUpdateFields {
1296                        status: Some(if tool_result.is_error {
1297                            acp::ToolCallStatus::Failed
1298                        } else {
1299                            acp::ToolCallStatus::Completed
1300                        }),
1301                        raw_output: tool_result.output.clone(),
1302                        ..Default::default()
1303                    },
1304                );
1305                this.update(cx, |this, _cx| {
1306                    this.pending_message()
1307                        .tool_results
1308                        .insert(tool_result.tool_use_id.clone(), tool_result);
1309                })?;
1310            }
1311
1312            this.update(cx, |this, cx| {
1313                this.flush_pending_message(cx);
1314                if this.title.is_none() && this.pending_title_generation.is_none() {
1315                    this.generate_title(cx);
1316                }
1317            })?;
1318
1319            if let Some(error) = error {
1320                attempt += 1;
1321                let retry =
1322                    this.update(cx, |this, _| this.handle_completion_error(error, attempt))??;
1323                let timer = cx.background_executor().timer(retry.duration);
1324                event_stream.send_retry(retry);
1325                timer.await;
1326                this.update(cx, |this, _cx| {
1327                    if let Some(Message::Agent(message)) = this.messages.last() {
1328                        if message.tool_results.is_empty() {
1329                            intent = CompletionIntent::UserPrompt;
1330                            this.messages.push(Message::Resume);
1331                        }
1332                    }
1333                })?;
1334            } else if this.read_with(cx, |this, _| this.tool_use_limit_reached)? {
1335                return Err(language_model::ToolUseLimitReachedError.into());
1336            } else if end_turn {
1337                return Ok(());
1338            } else {
1339                intent = CompletionIntent::ToolResults;
1340                attempt = 0;
1341            }
1342        }
1343    }
1344
1345    fn handle_completion_error(
1346        &mut self,
1347        error: LanguageModelCompletionError,
1348        attempt: u8,
1349    ) -> Result<acp_thread::RetryStatus> {
1350        if self.completion_mode == CompletionMode::Normal {
1351            return Err(anyhow!(error));
1352        }
1353
1354        let Some(strategy) = Self::retry_strategy_for(&error) else {
1355            return Err(anyhow!(error));
1356        };
1357
1358        let max_attempts = match &strategy {
1359            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1360            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1361        };
1362
1363        if attempt > max_attempts {
1364            return Err(anyhow!(error));
1365        }
1366
1367        let delay = match &strategy {
1368            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1369                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1370                Duration::from_secs(delay_secs)
1371            }
1372            RetryStrategy::Fixed { delay, .. } => *delay,
1373        };
1374        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1375
1376        Ok(acp_thread::RetryStatus {
1377            last_error: error.to_string().into(),
1378            attempt: attempt as usize,
1379            max_attempts: max_attempts as usize,
1380            started_at: Instant::now(),
1381            duration: delay,
1382        })
1383    }
1384
1385    /// A helper method that's called on every streamed completion event.
1386    /// Returns an optional tool result task, which the main agentic loop will
1387    /// send back to the model when it resolves.
1388    fn handle_completion_event(
1389        &mut self,
1390        event: LanguageModelCompletionEvent,
1391        event_stream: &ThreadEventStream,
1392        cx: &mut Context<Self>,
1393    ) -> Result<Option<Task<LanguageModelToolResult>>> {
1394        log::trace!("Handling streamed completion event: {:?}", event);
1395        use LanguageModelCompletionEvent::*;
1396
1397        match event {
1398            StartMessage { .. } => {
1399                self.flush_pending_message(cx);
1400                self.pending_message = Some(AgentMessage::default());
1401            }
1402            Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
1403            Thinking { text, signature } => {
1404                self.handle_thinking_event(text, signature, event_stream, cx)
1405            }
1406            RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
1407            ToolUse(tool_use) => {
1408                return Ok(self.handle_tool_use_event(tool_use, event_stream, cx));
1409            }
1410            ToolUseJsonParseError {
1411                id,
1412                tool_name,
1413                raw_input,
1414                json_parse_error,
1415            } => {
1416                return Ok(Some(Task::ready(
1417                    self.handle_tool_use_json_parse_error_event(
1418                        id,
1419                        tool_name,
1420                        raw_input,
1421                        json_parse_error,
1422                    ),
1423                )));
1424            }
1425            UsageUpdate(usage) => {
1426                telemetry::event!(
1427                    "Agent Thread Completion Usage Updated",
1428                    thread_id = self.id.to_string(),
1429                    prompt_id = self.prompt_id.to_string(),
1430                    model = self.model.as_ref().map(|m| m.telemetry_id()),
1431                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
1432                    input_tokens = usage.input_tokens,
1433                    output_tokens = usage.output_tokens,
1434                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
1435                    cache_read_input_tokens = usage.cache_read_input_tokens,
1436                );
1437                self.update_token_usage(usage, cx);
1438            }
1439            StatusUpdate(CompletionRequestStatus::UsageUpdated { amount, limit }) => {
1440                self.update_model_request_usage(amount, limit, cx);
1441            }
1442            StatusUpdate(
1443                CompletionRequestStatus::Started
1444                | CompletionRequestStatus::Queued { .. }
1445                | CompletionRequestStatus::Failed { .. },
1446            ) => {}
1447            StatusUpdate(CompletionRequestStatus::ToolUseLimitReached) => {
1448                self.tool_use_limit_reached = true;
1449            }
1450            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
1451            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
1452            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
1453        }
1454
1455        Ok(None)
1456    }
1457
1458    fn handle_text_event(
1459        &mut self,
1460        new_text: String,
1461        event_stream: &ThreadEventStream,
1462        cx: &mut Context<Self>,
1463    ) {
1464        event_stream.send_text(&new_text);
1465
1466        let last_message = self.pending_message();
1467        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1468            text.push_str(&new_text);
1469        } else {
1470            last_message
1471                .content
1472                .push(AgentMessageContent::Text(new_text));
1473        }
1474
1475        cx.notify();
1476    }
1477
1478    fn handle_thinking_event(
1479        &mut self,
1480        new_text: String,
1481        new_signature: Option<String>,
1482        event_stream: &ThreadEventStream,
1483        cx: &mut Context<Self>,
1484    ) {
1485        event_stream.send_thinking(&new_text);
1486
1487        let last_message = self.pending_message();
1488        if let Some(AgentMessageContent::Thinking { text, signature }) =
1489            last_message.content.last_mut()
1490        {
1491            text.push_str(&new_text);
1492            *signature = new_signature.or(signature.take());
1493        } else {
1494            last_message.content.push(AgentMessageContent::Thinking {
1495                text: new_text,
1496                signature: new_signature,
1497            });
1498        }
1499
1500        cx.notify();
1501    }
1502
1503    fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
1504        let last_message = self.pending_message();
1505        last_message
1506            .content
1507            .push(AgentMessageContent::RedactedThinking(data));
1508        cx.notify();
1509    }
1510
1511    fn handle_tool_use_event(
1512        &mut self,
1513        tool_use: LanguageModelToolUse,
1514        event_stream: &ThreadEventStream,
1515        cx: &mut Context<Self>,
1516    ) -> Option<Task<LanguageModelToolResult>> {
1517        cx.notify();
1518
1519        let tool = self.tool(tool_use.name.as_ref());
1520        let mut title = SharedString::from(&tool_use.name);
1521        let mut kind = acp::ToolKind::Other;
1522        if let Some(tool) = tool.as_ref() {
1523            title = tool.initial_title(tool_use.input.clone(), cx);
1524            kind = tool.kind();
1525        }
1526
1527        // Ensure the last message ends in the current tool use
1528        let last_message = self.pending_message();
1529        let push_new_tool_use = last_message.content.last_mut().is_none_or(|content| {
1530            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1531                if last_tool_use.id == tool_use.id {
1532                    *last_tool_use = tool_use.clone();
1533                    false
1534                } else {
1535                    true
1536                }
1537            } else {
1538                true
1539            }
1540        });
1541
1542        if push_new_tool_use {
1543            event_stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
1544            last_message
1545                .content
1546                .push(AgentMessageContent::ToolUse(tool_use.clone()));
1547        } else {
1548            event_stream.update_tool_call_fields(
1549                &tool_use.id,
1550                acp::ToolCallUpdateFields {
1551                    title: Some(title.into()),
1552                    kind: Some(kind),
1553                    raw_input: Some(tool_use.input.clone()),
1554                    ..Default::default()
1555                },
1556            );
1557        }
1558
1559        if !tool_use.is_input_complete {
1560            return None;
1561        }
1562
1563        let Some(tool) = tool else {
1564            let content = format!("No tool named {} exists", tool_use.name);
1565            return Some(Task::ready(LanguageModelToolResult {
1566                content: LanguageModelToolResultContent::Text(Arc::from(content)),
1567                tool_use_id: tool_use.id,
1568                tool_name: tool_use.name,
1569                is_error: true,
1570                output: None,
1571            }));
1572        };
1573
1574        let fs = self.project.read(cx).fs().clone();
1575        let tool_event_stream =
1576            ToolCallEventStream::new(tool_use.id.clone(), event_stream.clone(), Some(fs));
1577        tool_event_stream.update_fields(acp::ToolCallUpdateFields {
1578            status: Some(acp::ToolCallStatus::InProgress),
1579            ..Default::default()
1580        });
1581        let supports_images = self.model().is_some_and(|model| model.supports_images());
1582        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
1583        log::debug!("Running tool {}", tool_use.name);
1584        Some(cx.foreground_executor().spawn(async move {
1585            let tool_result = tool_result.await.and_then(|output| {
1586                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
1587                    && !supports_images
1588                {
1589                    return Err(anyhow!(
1590                        "Attempted to read an image, but this model doesn't support it.",
1591                    ));
1592                }
1593                Ok(output)
1594            });
1595
1596            match tool_result {
1597                Ok(output) => LanguageModelToolResult {
1598                    tool_use_id: tool_use.id,
1599                    tool_name: tool_use.name,
1600                    is_error: false,
1601                    content: output.llm_output,
1602                    output: Some(output.raw_output),
1603                },
1604                Err(error) => LanguageModelToolResult {
1605                    tool_use_id: tool_use.id,
1606                    tool_name: tool_use.name,
1607                    is_error: true,
1608                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
1609                    output: Some(error.to_string().into()),
1610                },
1611            }
1612        }))
1613    }
1614
1615    fn handle_tool_use_json_parse_error_event(
1616        &mut self,
1617        tool_use_id: LanguageModelToolUseId,
1618        tool_name: Arc<str>,
1619        raw_input: Arc<str>,
1620        json_parse_error: String,
1621    ) -> LanguageModelToolResult {
1622        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
1623        LanguageModelToolResult {
1624            tool_use_id,
1625            tool_name,
1626            is_error: true,
1627            content: LanguageModelToolResultContent::Text(tool_output.into()),
1628            output: Some(serde_json::Value::String(raw_input.to_string())),
1629        }
1630    }
1631
1632    fn update_model_request_usage(&self, amount: usize, limit: UsageLimit, cx: &mut Context<Self>) {
1633        self.project
1634            .read(cx)
1635            .user_store()
1636            .update(cx, |user_store, cx| {
1637                user_store.update_model_request_usage(
1638                    ModelRequestUsage(RequestUsage {
1639                        amount: amount as i32,
1640                        limit,
1641                    }),
1642                    cx,
1643                )
1644            });
1645    }
1646
1647    pub fn title(&self) -> SharedString {
1648        self.title.clone().unwrap_or("New Thread".into())
1649    }
1650
1651    pub fn summary(&mut self, cx: &mut Context<Self>) -> Task<Result<SharedString>> {
1652        if let Some(summary) = self.summary.as_ref() {
1653            return Task::ready(Ok(summary.clone()));
1654        }
1655        let Some(model) = self.summarization_model.clone() else {
1656            return Task::ready(Err(anyhow!("No summarization model available")));
1657        };
1658        let mut request = LanguageModelRequest {
1659            intent: Some(CompletionIntent::ThreadContextSummarization),
1660            temperature: AgentSettings::temperature_for_model(&model, cx),
1661            ..Default::default()
1662        };
1663
1664        for message in &self.messages {
1665            request.messages.extend(message.to_request());
1666        }
1667
1668        request.messages.push(LanguageModelRequestMessage {
1669            role: Role::User,
1670            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
1671            cache: false,
1672        });
1673        cx.spawn(async move |this, cx| {
1674            let mut summary = String::new();
1675            let mut messages = model.stream_completion(request, cx).await?;
1676            while let Some(event) = messages.next().await {
1677                let event = event?;
1678                let text = match event {
1679                    LanguageModelCompletionEvent::Text(text) => text,
1680                    LanguageModelCompletionEvent::StatusUpdate(
1681                        CompletionRequestStatus::UsageUpdated { amount, limit },
1682                    ) => {
1683                        this.update(cx, |thread, cx| {
1684                            thread.update_model_request_usage(amount, limit, cx);
1685                        })?;
1686                        continue;
1687                    }
1688                    _ => continue,
1689                };
1690
1691                let mut lines = text.lines();
1692                summary.extend(lines.next());
1693            }
1694
1695            log::debug!("Setting summary: {}", summary);
1696            let summary = SharedString::from(summary);
1697
1698            this.update(cx, |this, cx| {
1699                this.summary = Some(summary.clone());
1700                cx.notify()
1701            })?;
1702
1703            Ok(summary)
1704        })
1705    }
1706
1707    fn generate_title(&mut self, cx: &mut Context<Self>) {
1708        let Some(model) = self.summarization_model.clone() else {
1709            return;
1710        };
1711
1712        log::debug!(
1713            "Generating title with model: {:?}",
1714            self.summarization_model.as_ref().map(|model| model.name())
1715        );
1716        let mut request = LanguageModelRequest {
1717            intent: Some(CompletionIntent::ThreadSummarization),
1718            temperature: AgentSettings::temperature_for_model(&model, cx),
1719            ..Default::default()
1720        };
1721
1722        for message in &self.messages {
1723            request.messages.extend(message.to_request());
1724        }
1725
1726        request.messages.push(LanguageModelRequestMessage {
1727            role: Role::User,
1728            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
1729            cache: false,
1730        });
1731        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
1732            let mut title = String::new();
1733
1734            let generate = async {
1735                let mut messages = model.stream_completion(request, cx).await?;
1736                while let Some(event) = messages.next().await {
1737                    let event = event?;
1738                    let text = match event {
1739                        LanguageModelCompletionEvent::Text(text) => text,
1740                        LanguageModelCompletionEvent::StatusUpdate(
1741                            CompletionRequestStatus::UsageUpdated { amount, limit },
1742                        ) => {
1743                            this.update(cx, |thread, cx| {
1744                                thread.update_model_request_usage(amount, limit, cx);
1745                            })?;
1746                            continue;
1747                        }
1748                        _ => continue,
1749                    };
1750
1751                    let mut lines = text.lines();
1752                    title.extend(lines.next());
1753
1754                    // Stop if the LLM generated multiple lines.
1755                    if lines.next().is_some() {
1756                        break;
1757                    }
1758                }
1759                anyhow::Ok(())
1760            };
1761
1762            if generate.await.context("failed to generate title").is_ok() {
1763                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
1764            }
1765            _ = this.update(cx, |this, _| this.pending_title_generation = None);
1766        }));
1767    }
1768
1769    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1770        self.pending_title_generation = None;
1771        if Some(&title) != self.title.as_ref() {
1772            self.title = Some(title);
1773            cx.emit(TitleUpdated);
1774            cx.notify();
1775        }
1776    }
1777
1778    fn last_user_message(&self) -> Option<&UserMessage> {
1779        self.messages
1780            .iter()
1781            .rev()
1782            .find_map(|message| match message {
1783                Message::User(user_message) => Some(user_message),
1784                Message::Agent(_) => None,
1785                Message::Resume => None,
1786            })
1787    }
1788
1789    fn pending_message(&mut self) -> &mut AgentMessage {
1790        self.pending_message.get_or_insert_default()
1791    }
1792
1793    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
1794        let Some(mut message) = self.pending_message.take() else {
1795            return;
1796        };
1797
1798        if message.content.is_empty() {
1799            return;
1800        }
1801
1802        for content in &message.content {
1803            let AgentMessageContent::ToolUse(tool_use) = content else {
1804                continue;
1805            };
1806
1807            if !message.tool_results.contains_key(&tool_use.id) {
1808                message.tool_results.insert(
1809                    tool_use.id.clone(),
1810                    LanguageModelToolResult {
1811                        tool_use_id: tool_use.id.clone(),
1812                        tool_name: tool_use.name.clone(),
1813                        is_error: true,
1814                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
1815                        output: None,
1816                    },
1817                );
1818            }
1819        }
1820
1821        self.messages.push(Message::Agent(message));
1822        self.updated_at = Utc::now();
1823        self.summary = None;
1824        cx.notify()
1825    }
1826
1827    pub(crate) fn build_completion_request(
1828        &self,
1829        completion_intent: CompletionIntent,
1830        cx: &App,
1831    ) -> Result<LanguageModelRequest> {
1832        let model = self.model().context("No language model configured")?;
1833        let tools = if let Some(turn) = self.running_turn.as_ref() {
1834            turn.tools
1835                .iter()
1836                .filter_map(|(tool_name, tool)| {
1837                    log::trace!("Including tool: {}", tool_name);
1838                    Some(LanguageModelRequestTool {
1839                        name: tool_name.to_string(),
1840                        description: tool.description().to_string(),
1841                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
1842                    })
1843                })
1844                .collect::<Vec<_>>()
1845        } else {
1846            Vec::new()
1847        };
1848
1849        log::debug!("Building completion request");
1850        log::debug!("Completion intent: {:?}", completion_intent);
1851        log::debug!("Completion mode: {:?}", self.completion_mode);
1852
1853        let messages = self.build_request_messages(cx);
1854        log::debug!("Request will include {} messages", messages.len());
1855        log::debug!("Request includes {} tools", tools.len());
1856
1857        let request = LanguageModelRequest {
1858            thread_id: Some(self.id.to_string()),
1859            prompt_id: Some(self.prompt_id.to_string()),
1860            intent: Some(completion_intent),
1861            mode: Some(self.completion_mode.into()),
1862            messages,
1863            tools,
1864            tool_choice: None,
1865            stop: Vec::new(),
1866            temperature: AgentSettings::temperature_for_model(model, cx),
1867            thinking_allowed: true,
1868        };
1869
1870        log::debug!("Completion request built successfully");
1871        Ok(request)
1872    }
1873
1874    fn enabled_tools(
1875        &self,
1876        profile: &AgentProfileSettings,
1877        model: &Arc<dyn LanguageModel>,
1878        cx: &App,
1879    ) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
1880        fn truncate(tool_name: &SharedString) -> SharedString {
1881            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
1882                let mut truncated = tool_name.to_string();
1883                truncated.truncate(MAX_TOOL_NAME_LENGTH);
1884                truncated.into()
1885            } else {
1886                tool_name.clone()
1887            }
1888        }
1889
1890        let mut tools = self
1891            .tools
1892            .iter()
1893            .filter_map(|(tool_name, tool)| {
1894                if tool.supported_provider(&model.provider_id())
1895                    && profile.is_tool_enabled(tool_name)
1896                {
1897                    Some((truncate(tool_name), tool.clone()))
1898                } else {
1899                    None
1900                }
1901            })
1902            .collect::<BTreeMap<_, _>>();
1903
1904        let mut context_server_tools = Vec::new();
1905        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
1906        let mut duplicate_tool_names = HashSet::default();
1907        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
1908            for (tool_name, tool) in server_tools {
1909                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
1910                    let tool_name = truncate(tool_name);
1911                    if !seen_tools.insert(tool_name.clone()) {
1912                        duplicate_tool_names.insert(tool_name.clone());
1913                    }
1914                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
1915                }
1916            }
1917        }
1918
1919        // When there are duplicate tool names, disambiguate by prefixing them
1920        // with the server ID. In the rare case there isn't enough space for the
1921        // disambiguated tool name, keep only the last tool with this name.
1922        for (server_id, tool_name, tool) in context_server_tools {
1923            if duplicate_tool_names.contains(&tool_name) {
1924                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
1925                if available >= 2 {
1926                    let mut disambiguated = server_id.0.to_string();
1927                    disambiguated.truncate(available - 1);
1928                    disambiguated.push('_');
1929                    disambiguated.push_str(&tool_name);
1930                    tools.insert(disambiguated.into(), tool.clone());
1931                } else {
1932                    tools.insert(tool_name, tool.clone());
1933                }
1934            } else {
1935                tools.insert(tool_name, tool.clone());
1936            }
1937        }
1938
1939        tools
1940    }
1941
1942    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
1943        self.running_turn.as_ref()?.tools.get(name).cloned()
1944    }
1945
1946    fn build_request_messages(&self, cx: &App) -> Vec<LanguageModelRequestMessage> {
1947        log::trace!(
1948            "Building request messages from {} thread messages",
1949            self.messages.len()
1950        );
1951
1952        let system_prompt = SystemPromptTemplate {
1953            project: self.project_context.read(cx),
1954            available_tools: self.tools.keys().cloned().collect(),
1955        }
1956        .render(&self.templates)
1957        .context("failed to build system prompt")
1958        .expect("Invalid template");
1959        let mut messages = vec![LanguageModelRequestMessage {
1960            role: Role::System,
1961            content: vec![system_prompt.into()],
1962            cache: false,
1963        }];
1964        for message in &self.messages {
1965            messages.extend(message.to_request());
1966        }
1967
1968        if let Some(last_message) = messages.last_mut() {
1969            last_message.cache = true;
1970        }
1971
1972        if let Some(message) = self.pending_message.as_ref() {
1973            messages.extend(message.to_request());
1974        }
1975
1976        messages
1977    }
1978
1979    pub fn to_markdown(&self) -> String {
1980        let mut markdown = String::new();
1981        for (ix, message) in self.messages.iter().enumerate() {
1982            if ix > 0 {
1983                markdown.push('\n');
1984            }
1985            markdown.push_str(&message.to_markdown());
1986        }
1987
1988        if let Some(message) = self.pending_message.as_ref() {
1989            markdown.push('\n');
1990            markdown.push_str(&message.to_markdown());
1991        }
1992
1993        markdown
1994    }
1995
1996    fn advance_prompt_id(&mut self) {
1997        self.prompt_id = PromptId::new();
1998    }
1999
2000    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
2001        use LanguageModelCompletionError::*;
2002        use http_client::StatusCode;
2003
2004        // General strategy here:
2005        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
2006        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
2007        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
2008        match error {
2009            HttpResponseError {
2010                status_code: StatusCode::TOO_MANY_REQUESTS,
2011                ..
2012            } => Some(RetryStrategy::ExponentialBackoff {
2013                initial_delay: BASE_RETRY_DELAY,
2014                max_attempts: MAX_RETRY_ATTEMPTS,
2015            }),
2016            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
2017                Some(RetryStrategy::Fixed {
2018                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2019                    max_attempts: MAX_RETRY_ATTEMPTS,
2020                })
2021            }
2022            UpstreamProviderError {
2023                status,
2024                retry_after,
2025                ..
2026            } => match *status {
2027                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
2028                    Some(RetryStrategy::Fixed {
2029                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2030                        max_attempts: MAX_RETRY_ATTEMPTS,
2031                    })
2032                }
2033                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
2034                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2035                    // Internal Server Error could be anything, retry up to 3 times.
2036                    max_attempts: 3,
2037                }),
2038                status => {
2039                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
2040                    // but we frequently get them in practice. See https://http.dev/529
2041                    if status.as_u16() == 529 {
2042                        Some(RetryStrategy::Fixed {
2043                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2044                            max_attempts: MAX_RETRY_ATTEMPTS,
2045                        })
2046                    } else {
2047                        Some(RetryStrategy::Fixed {
2048                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2049                            max_attempts: 2,
2050                        })
2051                    }
2052                }
2053            },
2054            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
2055                delay: BASE_RETRY_DELAY,
2056                max_attempts: 3,
2057            }),
2058            ApiReadResponseError { .. }
2059            | HttpSend { .. }
2060            | DeserializeResponse { .. }
2061            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
2062                delay: BASE_RETRY_DELAY,
2063                max_attempts: 3,
2064            }),
2065            // Retrying these errors definitely shouldn't help.
2066            HttpResponseError {
2067                status_code:
2068                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
2069                ..
2070            }
2071            | AuthenticationError { .. }
2072            | PermissionError { .. }
2073            | NoApiKey { .. }
2074            | ApiEndpointNotFound { .. }
2075            | PromptTooLarge { .. } => None,
2076            // These errors might be transient, so retry them
2077            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
2078                delay: BASE_RETRY_DELAY,
2079                max_attempts: 1,
2080            }),
2081            // Retry all other 4xx and 5xx errors once.
2082            HttpResponseError { status_code, .. }
2083                if status_code.is_client_error() || status_code.is_server_error() =>
2084            {
2085                Some(RetryStrategy::Fixed {
2086                    delay: BASE_RETRY_DELAY,
2087                    max_attempts: 3,
2088                })
2089            }
2090            Other(err)
2091                if err.is::<language_model::PaymentRequiredError>()
2092                    || err.is::<language_model::ModelRequestLimitReachedError>() =>
2093            {
2094                // Retrying won't help for Payment Required or Model Request Limit errors (where
2095                // the user must upgrade to usage-based billing to get more requests, or else wait
2096                // for a significant amount of time for the request limit to reset).
2097                None
2098            }
2099            // Conservatively assume that any other errors are non-retryable
2100            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
2101                delay: BASE_RETRY_DELAY,
2102                max_attempts: 2,
2103            }),
2104        }
2105    }
2106}
2107
2108struct RunningTurn {
2109    /// Holds the task that handles agent interaction until the end of the turn.
2110    /// Survives across multiple requests as the model performs tool calls and
2111    /// we run tools, report their results.
2112    _task: Task<()>,
2113    /// The current event stream for the running turn. Used to report a final
2114    /// cancellation event if we cancel the turn.
2115    event_stream: ThreadEventStream,
2116    /// The tools that were enabled for this turn.
2117    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
2118}
2119
2120impl RunningTurn {
2121    fn cancel(self) {
2122        log::debug!("Cancelling in progress turn");
2123        self.event_stream.send_canceled();
2124    }
2125}
2126
2127pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
2128
2129impl EventEmitter<TokenUsageUpdated> for Thread {}
2130
2131pub struct TitleUpdated;
2132
2133impl EventEmitter<TitleUpdated> for Thread {}
2134
2135pub trait AgentTool
2136where
2137    Self: 'static + Sized,
2138{
2139    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
2140    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
2141
2142    fn name() -> &'static str;
2143
2144    fn description(&self) -> SharedString {
2145        let schema = schemars::schema_for!(Self::Input);
2146        SharedString::new(
2147            schema
2148                .get("description")
2149                .and_then(|description| description.as_str())
2150                .unwrap_or_default(),
2151        )
2152    }
2153
2154    fn kind() -> acp::ToolKind;
2155
2156    /// The initial tool title to display. Can be updated during the tool run.
2157    fn initial_title(
2158        &self,
2159        input: Result<Self::Input, serde_json::Value>,
2160        cx: &mut App,
2161    ) -> SharedString;
2162
2163    /// Returns the JSON schema that describes the tool's input.
2164    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Schema {
2165        crate::tool_schema::root_schema_for::<Self::Input>(format)
2166    }
2167
2168    /// Some tools rely on a provider for the underlying billing or other reasons.
2169    /// Allow the tool to check if they are compatible, or should be filtered out.
2170    fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2171        true
2172    }
2173
2174    /// Runs the tool with the provided input.
2175    fn run(
2176        self: Arc<Self>,
2177        input: Self::Input,
2178        event_stream: ToolCallEventStream,
2179        cx: &mut App,
2180    ) -> Task<Result<Self::Output>>;
2181
2182    /// Emits events for a previous execution of the tool.
2183    fn replay(
2184        &self,
2185        _input: Self::Input,
2186        _output: Self::Output,
2187        _event_stream: ToolCallEventStream,
2188        _cx: &mut App,
2189    ) -> Result<()> {
2190        Ok(())
2191    }
2192
2193    fn erase(self) -> Arc<dyn AnyAgentTool> {
2194        Arc::new(Erased(Arc::new(self)))
2195    }
2196}
2197
2198pub struct Erased<T>(T);
2199
2200pub struct AgentToolOutput {
2201    pub llm_output: LanguageModelToolResultContent,
2202    pub raw_output: serde_json::Value,
2203}
2204
2205pub trait AnyAgentTool {
2206    fn name(&self) -> SharedString;
2207    fn description(&self) -> SharedString;
2208    fn kind(&self) -> acp::ToolKind;
2209    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
2210    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
2211    fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2212        true
2213    }
2214    fn run(
2215        self: Arc<Self>,
2216        input: serde_json::Value,
2217        event_stream: ToolCallEventStream,
2218        cx: &mut App,
2219    ) -> Task<Result<AgentToolOutput>>;
2220    fn replay(
2221        &self,
2222        input: serde_json::Value,
2223        output: serde_json::Value,
2224        event_stream: ToolCallEventStream,
2225        cx: &mut App,
2226    ) -> Result<()>;
2227}
2228
2229impl<T> AnyAgentTool for Erased<Arc<T>>
2230where
2231    T: AgentTool,
2232{
2233    fn name(&self) -> SharedString {
2234        T::name().into()
2235    }
2236
2237    fn description(&self) -> SharedString {
2238        self.0.description()
2239    }
2240
2241    fn kind(&self) -> agent_client_protocol::ToolKind {
2242        T::kind()
2243    }
2244
2245    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
2246        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
2247        self.0.initial_title(parsed_input, _cx)
2248    }
2249
2250    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
2251        let mut json = serde_json::to_value(self.0.input_schema(format))?;
2252        adapt_schema_to_format(&mut json, format)?;
2253        Ok(json)
2254    }
2255
2256    fn supported_provider(&self, provider: &LanguageModelProviderId) -> bool {
2257        self.0.supported_provider(provider)
2258    }
2259
2260    fn run(
2261        self: Arc<Self>,
2262        input: serde_json::Value,
2263        event_stream: ToolCallEventStream,
2264        cx: &mut App,
2265    ) -> Task<Result<AgentToolOutput>> {
2266        cx.spawn(async move |cx| {
2267            let input = serde_json::from_value(input)?;
2268            let output = cx
2269                .update(|cx| self.0.clone().run(input, event_stream, cx))?
2270                .await?;
2271            let raw_output = serde_json::to_value(&output)?;
2272            Ok(AgentToolOutput {
2273                llm_output: output.into(),
2274                raw_output,
2275            })
2276        })
2277    }
2278
2279    fn replay(
2280        &self,
2281        input: serde_json::Value,
2282        output: serde_json::Value,
2283        event_stream: ToolCallEventStream,
2284        cx: &mut App,
2285    ) -> Result<()> {
2286        let input = serde_json::from_value(input)?;
2287        let output = serde_json::from_value(output)?;
2288        self.0.replay(input, output, event_stream, cx)
2289    }
2290}
2291
2292#[derive(Clone)]
2293struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
2294
2295impl ThreadEventStream {
2296    fn send_user_message(&self, message: &UserMessage) {
2297        self.0
2298            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
2299            .ok();
2300    }
2301
2302    fn send_text(&self, text: &str) {
2303        self.0
2304            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
2305            .ok();
2306    }
2307
2308    fn send_thinking(&self, text: &str) {
2309        self.0
2310            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
2311            .ok();
2312    }
2313
2314    fn send_tool_call(
2315        &self,
2316        id: &LanguageModelToolUseId,
2317        title: SharedString,
2318        kind: acp::ToolKind,
2319        input: serde_json::Value,
2320    ) {
2321        self.0
2322            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
2323                id,
2324                title.to_string(),
2325                kind,
2326                input,
2327            ))))
2328            .ok();
2329    }
2330
2331    fn initial_tool_call(
2332        id: &LanguageModelToolUseId,
2333        title: String,
2334        kind: acp::ToolKind,
2335        input: serde_json::Value,
2336    ) -> acp::ToolCall {
2337        acp::ToolCall {
2338            meta: None,
2339            id: acp::ToolCallId(id.to_string().into()),
2340            title,
2341            kind,
2342            status: acp::ToolCallStatus::Pending,
2343            content: vec![],
2344            locations: vec![],
2345            raw_input: Some(input),
2346            raw_output: None,
2347        }
2348    }
2349
2350    fn update_tool_call_fields(
2351        &self,
2352        tool_use_id: &LanguageModelToolUseId,
2353        fields: acp::ToolCallUpdateFields,
2354    ) {
2355        self.0
2356            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2357                acp::ToolCallUpdate {
2358                    meta: None,
2359                    id: acp::ToolCallId(tool_use_id.to_string().into()),
2360                    fields,
2361                }
2362                .into(),
2363            )))
2364            .ok();
2365    }
2366
2367    fn send_retry(&self, status: acp_thread::RetryStatus) {
2368        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
2369    }
2370
2371    fn send_stop(&self, reason: acp::StopReason) {
2372        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
2373    }
2374
2375    fn send_canceled(&self) {
2376        self.0
2377            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2378            .ok();
2379    }
2380
2381    fn send_error(&self, error: impl Into<anyhow::Error>) {
2382        self.0.unbounded_send(Err(error.into())).ok();
2383    }
2384}
2385
2386#[derive(Clone)]
2387pub struct ToolCallEventStream {
2388    tool_use_id: LanguageModelToolUseId,
2389    stream: ThreadEventStream,
2390    fs: Option<Arc<dyn Fs>>,
2391}
2392
2393impl ToolCallEventStream {
2394    #[cfg(test)]
2395    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
2396        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
2397
2398        let stream = ToolCallEventStream::new("test_id".into(), ThreadEventStream(events_tx), None);
2399
2400        (stream, ToolCallEventStreamReceiver(events_rx))
2401    }
2402
2403    fn new(
2404        tool_use_id: LanguageModelToolUseId,
2405        stream: ThreadEventStream,
2406        fs: Option<Arc<dyn Fs>>,
2407    ) -> Self {
2408        Self {
2409            tool_use_id,
2410            stream,
2411            fs,
2412        }
2413    }
2414
2415    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
2416        self.stream
2417            .update_tool_call_fields(&self.tool_use_id, fields);
2418    }
2419
2420    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
2421        self.stream
2422            .0
2423            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2424                acp_thread::ToolCallUpdateDiff {
2425                    id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2426                    diff,
2427                }
2428                .into(),
2429            )))
2430            .ok();
2431    }
2432
2433    pub fn authorize(&self, title: impl Into<String>, cx: &mut App) -> Task<Result<()>> {
2434        if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions {
2435            return Task::ready(Ok(()));
2436        }
2437
2438        let (response_tx, response_rx) = oneshot::channel();
2439        self.stream
2440            .0
2441            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
2442                ToolCallAuthorization {
2443                    tool_call: acp::ToolCallUpdate {
2444                        meta: None,
2445                        id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2446                        fields: acp::ToolCallUpdateFields {
2447                            title: Some(title.into()),
2448                            ..Default::default()
2449                        },
2450                    },
2451                    options: vec![
2452                        acp::PermissionOption {
2453                            id: acp::PermissionOptionId("always_allow".into()),
2454                            name: "Always Allow".into(),
2455                            kind: acp::PermissionOptionKind::AllowAlways,
2456                            meta: None,
2457                        },
2458                        acp::PermissionOption {
2459                            id: acp::PermissionOptionId("allow".into()),
2460                            name: "Allow".into(),
2461                            kind: acp::PermissionOptionKind::AllowOnce,
2462                            meta: None,
2463                        },
2464                        acp::PermissionOption {
2465                            id: acp::PermissionOptionId("deny".into()),
2466                            name: "Deny".into(),
2467                            kind: acp::PermissionOptionKind::RejectOnce,
2468                            meta: None,
2469                        },
2470                    ],
2471                    response: response_tx,
2472                },
2473            )))
2474            .ok();
2475        let fs = self.fs.clone();
2476        cx.spawn(async move |cx| match response_rx.await?.0.as_ref() {
2477            "always_allow" => {
2478                if let Some(fs) = fs.clone() {
2479                    cx.update(|cx| {
2480                        update_settings_file(fs, cx, |settings, _| {
2481                            settings
2482                                .agent
2483                                .get_or_insert_default()
2484                                .set_always_allow_tool_actions(true);
2485                        });
2486                    })?;
2487                }
2488
2489                Ok(())
2490            }
2491            "allow" => Ok(()),
2492            _ => Err(anyhow!("Permission to run tool denied by user")),
2493        })
2494    }
2495}
2496
2497#[cfg(test)]
2498pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
2499
2500#[cfg(test)]
2501impl ToolCallEventStreamReceiver {
2502    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
2503        let event = self.0.next().await;
2504        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
2505            auth
2506        } else {
2507            panic!("Expected ToolCallAuthorization but got: {:?}", event);
2508        }
2509    }
2510
2511    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
2512        let event = self.0.next().await;
2513        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2514            update,
2515        )))) = event
2516        {
2517            update.fields
2518        } else {
2519            panic!("Expected update fields but got: {:?}", event);
2520        }
2521    }
2522
2523    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
2524        let event = self.0.next().await;
2525        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
2526            update,
2527        )))) = event
2528        {
2529            update.diff
2530        } else {
2531            panic!("Expected diff but got: {:?}", event);
2532        }
2533    }
2534
2535    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
2536        let event = self.0.next().await;
2537        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
2538            update,
2539        )))) = event
2540        {
2541            update.terminal
2542        } else {
2543            panic!("Expected terminal but got: {:?}", event);
2544        }
2545    }
2546}
2547
2548#[cfg(test)]
2549impl std::ops::Deref for ToolCallEventStreamReceiver {
2550    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
2551
2552    fn deref(&self) -> &Self::Target {
2553        &self.0
2554    }
2555}
2556
2557#[cfg(test)]
2558impl std::ops::DerefMut for ToolCallEventStreamReceiver {
2559    fn deref_mut(&mut self) -> &mut Self::Target {
2560        &mut self.0
2561    }
2562}
2563
2564impl From<&str> for UserMessageContent {
2565    fn from(text: &str) -> Self {
2566        Self::Text(text.into())
2567    }
2568}
2569
2570impl From<acp::ContentBlock> for UserMessageContent {
2571    fn from(value: acp::ContentBlock) -> Self {
2572        match value {
2573            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
2574            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
2575            acp::ContentBlock::Audio(_) => {
2576                // TODO
2577                Self::Text("[audio]".to_string())
2578            }
2579            acp::ContentBlock::ResourceLink(resource_link) => {
2580                match MentionUri::parse(&resource_link.uri) {
2581                    Ok(uri) => Self::Mention {
2582                        uri,
2583                        content: String::new(),
2584                    },
2585                    Err(err) => {
2586                        log::error!("Failed to parse mention link: {}", err);
2587                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
2588                    }
2589                }
2590            }
2591            acp::ContentBlock::Resource(resource) => match resource.resource {
2592                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
2593                    match MentionUri::parse(&resource.uri) {
2594                        Ok(uri) => Self::Mention {
2595                            uri,
2596                            content: resource.text,
2597                        },
2598                        Err(err) => {
2599                            log::error!("Failed to parse mention link: {}", err);
2600                            Self::Text(
2601                                MarkdownCodeBlock {
2602                                    tag: &resource.uri,
2603                                    text: &resource.text,
2604                                }
2605                                .to_string(),
2606                            )
2607                        }
2608                    }
2609                }
2610                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
2611                    // TODO
2612                    Self::Text("[blob]".to_string())
2613                }
2614            },
2615        }
2616    }
2617}
2618
2619impl From<UserMessageContent> for acp::ContentBlock {
2620    fn from(content: UserMessageContent) -> Self {
2621        match content {
2622            UserMessageContent::Text(text) => acp::ContentBlock::Text(acp::TextContent {
2623                text,
2624                annotations: None,
2625                meta: None,
2626            }),
2627            UserMessageContent::Image(image) => acp::ContentBlock::Image(acp::ImageContent {
2628                data: image.source.to_string(),
2629                mime_type: "image/png".to_string(),
2630                meta: None,
2631                annotations: None,
2632                uri: None,
2633            }),
2634            UserMessageContent::Mention { uri, content } => {
2635                acp::ContentBlock::Resource(acp::EmbeddedResource {
2636                    meta: None,
2637                    resource: acp::EmbeddedResourceResource::TextResourceContents(
2638                        acp::TextResourceContents {
2639                            meta: None,
2640                            mime_type: None,
2641                            text: content,
2642                            uri: uri.to_uri().to_string(),
2643                        },
2644                    ),
2645                    annotations: None,
2646                })
2647            }
2648        }
2649    }
2650}
2651
2652fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
2653    LanguageModelImage {
2654        source: image_content.data.into(),
2655        // TODO: make this optional?
2656        size: gpui::Size::new(0.into(), 0.into()),
2657    }
2658}