thread.rs

   1use crate::{
   2    ContextServerRegistry, CopyPathTool, CreateDirectoryTool, DbLanguageModel, DbThread,
   3    DeletePathTool, DiagnosticsTool, EditFileTool, FetchTool, FindPathTool, GrepTool,
   4    ListDirectoryTool, MovePathTool, NowTool, OpenTool, ReadFileTool, SystemPromptTemplate,
   5    Template, Templates, TerminalTool, ThinkingTool, WebSearchTool,
   6};
   7use acp_thread::{MentionUri, UserMessageId};
   8use action_log::ActionLog;
   9use agent::thread::{GitState, ProjectSnapshot, WorktreeSnapshot};
  10use agent_client_protocol as acp;
  11use agent_settings::{
  12    AgentProfileId, AgentProfileSettings, AgentSettings, CompletionMode,
  13    SUMMARIZE_THREAD_DETAILED_PROMPT, SUMMARIZE_THREAD_PROMPT,
  14};
  15use anyhow::{Context as _, Result, anyhow};
  16use assistant_tool::adapt_schema_to_format;
  17use chrono::{DateTime, Utc};
  18use client::{ModelRequestUsage, RequestUsage};
  19use cloud_llm_client::{CompletionIntent, CompletionRequestStatus, UsageLimit};
  20use collections::{HashMap, HashSet, IndexMap};
  21use fs::Fs;
  22use futures::{
  23    FutureExt,
  24    channel::{mpsc, oneshot},
  25    future::Shared,
  26    stream::FuturesUnordered,
  27};
  28use git::repository::DiffType;
  29use gpui::{
  30    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  31};
  32use language_model::{
  33    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelExt,
  34    LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
  35    LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
  36    LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse,
  37    LanguageModelToolUseId, Role, SelectedModel, StopReason, TokenUsage,
  38};
  39use project::{
  40    Project,
  41    git_store::{GitStore, RepositoryState},
  42};
  43use prompt_store::ProjectContext;
  44use schemars::{JsonSchema, Schema};
  45use serde::{Deserialize, Serialize};
  46use settings::{Settings, update_settings_file};
  47use smol::stream::StreamExt;
  48use std::{
  49    collections::BTreeMap,
  50    ops::RangeInclusive,
  51    path::Path,
  52    rc::Rc,
  53    sync::Arc,
  54    time::{Duration, Instant},
  55};
  56use std::{fmt::Write, path::PathBuf};
  57use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock};
  58use uuid::Uuid;
  59
  60const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  61pub const MAX_TOOL_NAME_LENGTH: usize = 64;
  62
  63/// The ID of the user prompt that initiated a request.
  64///
  65/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  66#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  67pub struct PromptId(Arc<str>);
  68
  69impl PromptId {
  70    pub fn new() -> Self {
  71        Self(Uuid::new_v4().to_string().into())
  72    }
  73}
  74
  75impl std::fmt::Display for PromptId {
  76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  77        write!(f, "{}", self.0)
  78    }
  79}
  80
  81pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
  82pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
  83
  84#[derive(Debug, Clone)]
  85enum RetryStrategy {
  86    ExponentialBackoff {
  87        initial_delay: Duration,
  88        max_attempts: u8,
  89    },
  90    Fixed {
  91        delay: Duration,
  92        max_attempts: u8,
  93    },
  94}
  95
  96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  97pub enum Message {
  98    User(UserMessage),
  99    Agent(AgentMessage),
 100    Resume,
 101}
 102
 103impl Message {
 104    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
 105        match self {
 106            Message::Agent(agent_message) => Some(agent_message),
 107            _ => None,
 108        }
 109    }
 110
 111    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 112        match self {
 113            Message::User(message) => vec![message.to_request()],
 114            Message::Agent(message) => message.to_request(),
 115            Message::Resume => vec![LanguageModelRequestMessage {
 116                role: Role::User,
 117                content: vec!["Continue where you left off".into()],
 118                cache: false,
 119            }],
 120        }
 121    }
 122
 123    pub fn to_markdown(&self) -> String {
 124        match self {
 125            Message::User(message) => message.to_markdown(),
 126            Message::Agent(message) => message.to_markdown(),
 127            Message::Resume => "[resume]\n".into(),
 128        }
 129    }
 130
 131    pub fn role(&self) -> Role {
 132        match self {
 133            Message::User(_) | Message::Resume => Role::User,
 134            Message::Agent(_) => Role::Assistant,
 135        }
 136    }
 137}
 138
 139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 140pub struct UserMessage {
 141    pub id: UserMessageId,
 142    pub content: Vec<UserMessageContent>,
 143}
 144
 145#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 146pub enum UserMessageContent {
 147    Text(String),
 148    Mention { uri: MentionUri, content: String },
 149    Image(LanguageModelImage),
 150}
 151
 152impl UserMessage {
 153    pub fn to_markdown(&self) -> String {
 154        let mut markdown = String::from("## User\n\n");
 155
 156        for content in &self.content {
 157            match content {
 158                UserMessageContent::Text(text) => {
 159                    markdown.push_str(text);
 160                    markdown.push('\n');
 161                }
 162                UserMessageContent::Image(_) => {
 163                    markdown.push_str("<image />\n");
 164                }
 165                UserMessageContent::Mention { uri, content } => {
 166                    if !content.is_empty() {
 167                        let _ = writeln!(&mut markdown, "{}\n\n{}", uri.as_link(), content);
 168                    } else {
 169                        let _ = writeln!(&mut markdown, "{}", uri.as_link());
 170                    }
 171                }
 172            }
 173        }
 174
 175        markdown
 176    }
 177
 178    fn to_request(&self) -> LanguageModelRequestMessage {
 179        let mut message = LanguageModelRequestMessage {
 180            role: Role::User,
 181            content: Vec::with_capacity(self.content.len()),
 182            cache: false,
 183        };
 184
 185        const OPEN_CONTEXT: &str = "<context>\n\
 186            The following items were attached by the user. \
 187            They are up-to-date and don't need to be re-read.\n\n";
 188
 189        const OPEN_FILES_TAG: &str = "<files>";
 190        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 191        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 192        const OPEN_SELECTIONS_TAG: &str = "<selections>";
 193        const OPEN_THREADS_TAG: &str = "<threads>";
 194        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 195        const OPEN_RULES_TAG: &str =
 196            "<rules>\nThe user has specified the following rules that should be applied:\n";
 197
 198        let mut file_context = OPEN_FILES_TAG.to_string();
 199        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 200        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 201        let mut selection_context = OPEN_SELECTIONS_TAG.to_string();
 202        let mut thread_context = OPEN_THREADS_TAG.to_string();
 203        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 204        let mut rules_context = OPEN_RULES_TAG.to_string();
 205
 206        for chunk in &self.content {
 207            let chunk = match chunk {
 208                UserMessageContent::Text(text) => {
 209                    language_model::MessageContent::Text(text.clone())
 210                }
 211                UserMessageContent::Image(value) => {
 212                    language_model::MessageContent::Image(value.clone())
 213                }
 214                UserMessageContent::Mention { uri, content } => {
 215                    match uri {
 216                        MentionUri::File { abs_path } => {
 217                            write!(
 218                                &mut file_context,
 219                                "\n{}",
 220                                MarkdownCodeBlock {
 221                                    tag: &codeblock_tag(abs_path, None),
 222                                    text: &content.to_string(),
 223                                }
 224                            )
 225                            .ok();
 226                        }
 227                        MentionUri::PastedImage => {
 228                            debug_panic!("pasted image URI should not be used in mention content")
 229                        }
 230                        MentionUri::Directory { .. } => {
 231                            write!(&mut directory_context, "\n{}\n", content).ok();
 232                        }
 233                        MentionUri::Symbol {
 234                            abs_path: path,
 235                            line_range,
 236                            ..
 237                        } => {
 238                            write!(
 239                                &mut symbol_context,
 240                                "\n{}",
 241                                MarkdownCodeBlock {
 242                                    tag: &codeblock_tag(path, Some(line_range)),
 243                                    text: content
 244                                }
 245                            )
 246                            .ok();
 247                        }
 248                        MentionUri::Selection {
 249                            abs_path: path,
 250                            line_range,
 251                            ..
 252                        } => {
 253                            write!(
 254                                &mut selection_context,
 255                                "\n{}",
 256                                MarkdownCodeBlock {
 257                                    tag: &codeblock_tag(
 258                                        path.as_deref().unwrap_or("Untitled".as_ref()),
 259                                        Some(line_range)
 260                                    ),
 261                                    text: content
 262                                }
 263                            )
 264                            .ok();
 265                        }
 266                        MentionUri::Thread { .. } => {
 267                            write!(&mut thread_context, "\n{}\n", content).ok();
 268                        }
 269                        MentionUri::TextThread { .. } => {
 270                            write!(&mut thread_context, "\n{}\n", content).ok();
 271                        }
 272                        MentionUri::Rule { .. } => {
 273                            write!(
 274                                &mut rules_context,
 275                                "\n{}",
 276                                MarkdownCodeBlock {
 277                                    tag: "",
 278                                    text: content
 279                                }
 280                            )
 281                            .ok();
 282                        }
 283                        MentionUri::Fetch { url } => {
 284                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 285                        }
 286                    }
 287
 288                    language_model::MessageContent::Text(uri.as_link().to_string())
 289                }
 290            };
 291
 292            message.content.push(chunk);
 293        }
 294
 295        let len_before_context = message.content.len();
 296
 297        if file_context.len() > OPEN_FILES_TAG.len() {
 298            file_context.push_str("</files>\n");
 299            message
 300                .content
 301                .push(language_model::MessageContent::Text(file_context));
 302        }
 303
 304        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 305            directory_context.push_str("</directories>\n");
 306            message
 307                .content
 308                .push(language_model::MessageContent::Text(directory_context));
 309        }
 310
 311        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 312            symbol_context.push_str("</symbols>\n");
 313            message
 314                .content
 315                .push(language_model::MessageContent::Text(symbol_context));
 316        }
 317
 318        if selection_context.len() > OPEN_SELECTIONS_TAG.len() {
 319            selection_context.push_str("</selections>\n");
 320            message
 321                .content
 322                .push(language_model::MessageContent::Text(selection_context));
 323        }
 324
 325        if thread_context.len() > OPEN_THREADS_TAG.len() {
 326            thread_context.push_str("</threads>\n");
 327            message
 328                .content
 329                .push(language_model::MessageContent::Text(thread_context));
 330        }
 331
 332        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 333            fetch_context.push_str("</fetched_urls>\n");
 334            message
 335                .content
 336                .push(language_model::MessageContent::Text(fetch_context));
 337        }
 338
 339        if rules_context.len() > OPEN_RULES_TAG.len() {
 340            rules_context.push_str("</user_rules>\n");
 341            message
 342                .content
 343                .push(language_model::MessageContent::Text(rules_context));
 344        }
 345
 346        if message.content.len() > len_before_context {
 347            message.content.insert(
 348                len_before_context,
 349                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 350            );
 351            message
 352                .content
 353                .push(language_model::MessageContent::Text("</context>".into()));
 354        }
 355
 356        message
 357    }
 358}
 359
 360fn codeblock_tag(full_path: &Path, line_range: Option<&RangeInclusive<u32>>) -> String {
 361    let mut result = String::new();
 362
 363    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 364        let _ = write!(result, "{} ", extension);
 365    }
 366
 367    let _ = write!(result, "{}", full_path.display());
 368
 369    if let Some(range) = line_range {
 370        if range.start() == range.end() {
 371            let _ = write!(result, ":{}", range.start() + 1);
 372        } else {
 373            let _ = write!(result, ":{}-{}", range.start() + 1, range.end() + 1);
 374        }
 375    }
 376
 377    result
 378}
 379
 380impl AgentMessage {
 381    pub fn to_markdown(&self) -> String {
 382        let mut markdown = String::from("## Assistant\n\n");
 383
 384        for content in &self.content {
 385            match content {
 386                AgentMessageContent::Text(text) => {
 387                    markdown.push_str(text);
 388                    markdown.push('\n');
 389                }
 390                AgentMessageContent::Thinking { text, .. } => {
 391                    markdown.push_str("<think>");
 392                    markdown.push_str(text);
 393                    markdown.push_str("</think>\n");
 394                }
 395                AgentMessageContent::RedactedThinking(_) => {
 396                    markdown.push_str("<redacted_thinking />\n")
 397                }
 398                AgentMessageContent::ToolUse(tool_use) => {
 399                    markdown.push_str(&format!(
 400                        "**Tool Use**: {} (ID: {})\n",
 401                        tool_use.name, tool_use.id
 402                    ));
 403                    markdown.push_str(&format!(
 404                        "{}\n",
 405                        MarkdownCodeBlock {
 406                            tag: "json",
 407                            text: &format!("{:#}", tool_use.input)
 408                        }
 409                    ));
 410                }
 411            }
 412        }
 413
 414        for tool_result in self.tool_results.values() {
 415            markdown.push_str(&format!(
 416                "**Tool Result**: {} (ID: {})\n\n",
 417                tool_result.tool_name, tool_result.tool_use_id
 418            ));
 419            if tool_result.is_error {
 420                markdown.push_str("**ERROR:**\n");
 421            }
 422
 423            match &tool_result.content {
 424                LanguageModelToolResultContent::Text(text) => {
 425                    writeln!(markdown, "{text}\n").ok();
 426                }
 427                LanguageModelToolResultContent::Image(_) => {
 428                    writeln!(markdown, "<image />\n").ok();
 429                }
 430            }
 431
 432            if let Some(output) = tool_result.output.as_ref() {
 433                writeln!(
 434                    markdown,
 435                    "**Debug Output**:\n\n```json\n{}\n```\n",
 436                    serde_json::to_string_pretty(output).unwrap()
 437                )
 438                .unwrap();
 439            }
 440        }
 441
 442        markdown
 443    }
 444
 445    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 446        let mut assistant_message = LanguageModelRequestMessage {
 447            role: Role::Assistant,
 448            content: Vec::with_capacity(self.content.len()),
 449            cache: false,
 450        };
 451        for chunk in &self.content {
 452            match chunk {
 453                AgentMessageContent::Text(text) => {
 454                    assistant_message
 455                        .content
 456                        .push(language_model::MessageContent::Text(text.clone()));
 457                }
 458                AgentMessageContent::Thinking { text, signature } => {
 459                    assistant_message
 460                        .content
 461                        .push(language_model::MessageContent::Thinking {
 462                            text: text.clone(),
 463                            signature: signature.clone(),
 464                        });
 465                }
 466                AgentMessageContent::RedactedThinking(value) => {
 467                    assistant_message.content.push(
 468                        language_model::MessageContent::RedactedThinking(value.clone()),
 469                    );
 470                }
 471                AgentMessageContent::ToolUse(tool_use) => {
 472                    if self.tool_results.contains_key(&tool_use.id) {
 473                        assistant_message
 474                            .content
 475                            .push(language_model::MessageContent::ToolUse(tool_use.clone()));
 476                    }
 477                }
 478            };
 479        }
 480
 481        let mut user_message = LanguageModelRequestMessage {
 482            role: Role::User,
 483            content: Vec::new(),
 484            cache: false,
 485        };
 486
 487        for tool_result in self.tool_results.values() {
 488            let mut tool_result = tool_result.clone();
 489            // Surprisingly, the API fails if we return an empty string here.
 490            // It thinks we are sending a tool use without a tool result.
 491            if tool_result.content.is_empty() {
 492                tool_result.content = "<Tool returned an empty string>".into();
 493            }
 494            user_message
 495                .content
 496                .push(language_model::MessageContent::ToolResult(tool_result));
 497        }
 498
 499        let mut messages = Vec::new();
 500        if !assistant_message.content.is_empty() {
 501            messages.push(assistant_message);
 502        }
 503        if !user_message.content.is_empty() {
 504            messages.push(user_message);
 505        }
 506        messages
 507    }
 508}
 509
 510#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 511pub struct AgentMessage {
 512    pub content: Vec<AgentMessageContent>,
 513    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 514}
 515
 516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 517pub enum AgentMessageContent {
 518    Text(String),
 519    Thinking {
 520        text: String,
 521        signature: Option<String>,
 522    },
 523    RedactedThinking(String),
 524    ToolUse(LanguageModelToolUse),
 525}
 526
 527pub trait TerminalHandle {
 528    fn id(&self, cx: &AsyncApp) -> Result<acp::TerminalId>;
 529    fn current_output(&self, cx: &AsyncApp) -> Result<acp::TerminalOutputResponse>;
 530    fn wait_for_exit(&self, cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>>;
 531}
 532
 533pub trait ThreadEnvironment {
 534    fn create_terminal(
 535        &self,
 536        command: String,
 537        cwd: Option<PathBuf>,
 538        output_byte_limit: Option<u64>,
 539        cx: &mut AsyncApp,
 540    ) -> Task<Result<Rc<dyn TerminalHandle>>>;
 541}
 542
 543#[derive(Debug)]
 544pub enum ThreadEvent {
 545    UserMessage(UserMessage),
 546    AgentText(String),
 547    AgentThinking(String),
 548    ToolCall(acp::ToolCall),
 549    ToolCallUpdate(acp_thread::ToolCallUpdate),
 550    ToolCallAuthorization(ToolCallAuthorization),
 551    Retry(acp_thread::RetryStatus),
 552    Stop(acp::StopReason),
 553}
 554
 555#[derive(Debug)]
 556pub struct NewTerminal {
 557    pub command: String,
 558    pub output_byte_limit: Option<u64>,
 559    pub cwd: Option<PathBuf>,
 560    pub response: oneshot::Sender<Result<Entity<acp_thread::Terminal>>>,
 561}
 562
 563#[derive(Debug)]
 564pub struct ToolCallAuthorization {
 565    pub tool_call: acp::ToolCallUpdate,
 566    pub options: Vec<acp::PermissionOption>,
 567    pub response: oneshot::Sender<acp::PermissionOptionId>,
 568}
 569
 570#[derive(Debug, thiserror::Error)]
 571enum CompletionError {
 572    #[error("max tokens")]
 573    MaxTokens,
 574    #[error("refusal")]
 575    Refusal,
 576    #[error(transparent)]
 577    Other(#[from] anyhow::Error),
 578}
 579
 580pub struct Thread {
 581    id: acp::SessionId,
 582    prompt_id: PromptId,
 583    updated_at: DateTime<Utc>,
 584    title: Option<SharedString>,
 585    pending_title_generation: Option<Task<()>>,
 586    summary: Option<SharedString>,
 587    messages: Vec<Message>,
 588    completion_mode: CompletionMode,
 589    /// Holds the task that handles agent interaction until the end of the turn.
 590    /// Survives across multiple requests as the model performs tool calls and
 591    /// we run tools, report their results.
 592    running_turn: Option<RunningTurn>,
 593    pending_message: Option<AgentMessage>,
 594    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 595    tool_use_limit_reached: bool,
 596    request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
 597    #[allow(unused)]
 598    cumulative_token_usage: TokenUsage,
 599    #[allow(unused)]
 600    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 601    context_server_registry: Entity<ContextServerRegistry>,
 602    profile_id: AgentProfileId,
 603    project_context: Entity<ProjectContext>,
 604    templates: Arc<Templates>,
 605    model: Option<Arc<dyn LanguageModel>>,
 606    summarization_model: Option<Arc<dyn LanguageModel>>,
 607    prompt_capabilities_tx: watch::Sender<acp::PromptCapabilities>,
 608    pub(crate) prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 609    pub(crate) project: Entity<Project>,
 610    pub(crate) action_log: Entity<ActionLog>,
 611}
 612
 613impl Thread {
 614    fn prompt_capabilities(model: Option<&dyn LanguageModel>) -> acp::PromptCapabilities {
 615        let image = model.map_or(true, |model| model.supports_images());
 616        acp::PromptCapabilities {
 617            meta: None,
 618            image,
 619            audio: false,
 620            embedded_context: true,
 621        }
 622    }
 623
 624    pub fn new(
 625        project: Entity<Project>,
 626        project_context: Entity<ProjectContext>,
 627        context_server_registry: Entity<ContextServerRegistry>,
 628        templates: Arc<Templates>,
 629        model: Option<Arc<dyn LanguageModel>>,
 630        cx: &mut Context<Self>,
 631    ) -> Self {
 632        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 633        let action_log = cx.new(|_cx| ActionLog::new(project.clone()));
 634        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 635            watch::channel(Self::prompt_capabilities(model.as_deref()));
 636        Self {
 637            id: acp::SessionId(uuid::Uuid::new_v4().to_string().into()),
 638            prompt_id: PromptId::new(),
 639            updated_at: Utc::now(),
 640            title: None,
 641            pending_title_generation: None,
 642            summary: None,
 643            messages: Vec::new(),
 644            completion_mode: AgentSettings::get_global(cx).preferred_completion_mode,
 645            running_turn: None,
 646            pending_message: None,
 647            tools: BTreeMap::default(),
 648            tool_use_limit_reached: false,
 649            request_token_usage: HashMap::default(),
 650            cumulative_token_usage: TokenUsage::default(),
 651            initial_project_snapshot: {
 652                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 653                cx.foreground_executor()
 654                    .spawn(async move { Some(project_snapshot.await) })
 655                    .shared()
 656            },
 657            context_server_registry,
 658            profile_id,
 659            project_context,
 660            templates,
 661            model,
 662            summarization_model: None,
 663            prompt_capabilities_tx,
 664            prompt_capabilities_rx,
 665            project,
 666            action_log,
 667        }
 668    }
 669
 670    pub fn id(&self) -> &acp::SessionId {
 671        &self.id
 672    }
 673
 674    pub fn replay(
 675        &mut self,
 676        cx: &mut Context<Self>,
 677    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 678        let (tx, rx) = mpsc::unbounded();
 679        let stream = ThreadEventStream(tx);
 680        for message in &self.messages {
 681            match message {
 682                Message::User(user_message) => stream.send_user_message(user_message),
 683                Message::Agent(assistant_message) => {
 684                    for content in &assistant_message.content {
 685                        match content {
 686                            AgentMessageContent::Text(text) => stream.send_text(text),
 687                            AgentMessageContent::Thinking { text, .. } => {
 688                                stream.send_thinking(text)
 689                            }
 690                            AgentMessageContent::RedactedThinking(_) => {}
 691                            AgentMessageContent::ToolUse(tool_use) => {
 692                                self.replay_tool_call(
 693                                    tool_use,
 694                                    assistant_message.tool_results.get(&tool_use.id),
 695                                    &stream,
 696                                    cx,
 697                                );
 698                            }
 699                        }
 700                    }
 701                }
 702                Message::Resume => {}
 703            }
 704        }
 705        rx
 706    }
 707
 708    fn replay_tool_call(
 709        &self,
 710        tool_use: &LanguageModelToolUse,
 711        tool_result: Option<&LanguageModelToolResult>,
 712        stream: &ThreadEventStream,
 713        cx: &mut Context<Self>,
 714    ) {
 715        let tool = self.tools.get(tool_use.name.as_ref()).cloned().or_else(|| {
 716            self.context_server_registry
 717                .read(cx)
 718                .servers()
 719                .find_map(|(_, tools)| {
 720                    if let Some(tool) = tools.get(tool_use.name.as_ref()) {
 721                        Some(tool.clone())
 722                    } else {
 723                        None
 724                    }
 725                })
 726        });
 727
 728        let Some(tool) = tool else {
 729            stream
 730                .0
 731                .unbounded_send(Ok(ThreadEvent::ToolCall(acp::ToolCall {
 732                    meta: None,
 733                    id: acp::ToolCallId(tool_use.id.to_string().into()),
 734                    title: tool_use.name.to_string(),
 735                    kind: acp::ToolKind::Other,
 736                    status: acp::ToolCallStatus::Failed,
 737                    content: Vec::new(),
 738                    locations: Vec::new(),
 739                    raw_input: Some(tool_use.input.clone()),
 740                    raw_output: None,
 741                })))
 742                .ok();
 743            return;
 744        };
 745
 746        let title = tool.initial_title(tool_use.input.clone(), cx);
 747        let kind = tool.kind();
 748        stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
 749
 750        let output = tool_result
 751            .as_ref()
 752            .and_then(|result| result.output.clone());
 753        if let Some(output) = output.clone() {
 754            let tool_event_stream = ToolCallEventStream::new(
 755                tool_use.id.clone(),
 756                stream.clone(),
 757                Some(self.project.read(cx).fs().clone()),
 758            );
 759            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
 760                .log_err();
 761        }
 762
 763        stream.update_tool_call_fields(
 764            &tool_use.id,
 765            acp::ToolCallUpdateFields {
 766                status: Some(
 767                    tool_result
 768                        .as_ref()
 769                        .map_or(acp::ToolCallStatus::Failed, |result| {
 770                            if result.is_error {
 771                                acp::ToolCallStatus::Failed
 772                            } else {
 773                                acp::ToolCallStatus::Completed
 774                            }
 775                        }),
 776                ),
 777                raw_output: output,
 778                ..Default::default()
 779            },
 780        );
 781    }
 782
 783    pub fn from_db(
 784        id: acp::SessionId,
 785        db_thread: DbThread,
 786        project: Entity<Project>,
 787        project_context: Entity<ProjectContext>,
 788        context_server_registry: Entity<ContextServerRegistry>,
 789        action_log: Entity<ActionLog>,
 790        templates: Arc<Templates>,
 791        cx: &mut Context<Self>,
 792    ) -> Self {
 793        let profile_id = db_thread
 794            .profile
 795            .unwrap_or_else(|| AgentSettings::get_global(cx).default_profile.clone());
 796        let model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
 797            db_thread
 798                .model
 799                .and_then(|model| {
 800                    let model = SelectedModel {
 801                        provider: model.provider.clone().into(),
 802                        model: model.model.into(),
 803                    };
 804                    registry.select_model(&model, cx)
 805                })
 806                .or_else(|| registry.default_model())
 807                .map(|model| model.model)
 808        });
 809        let (prompt_capabilities_tx, prompt_capabilities_rx) =
 810            watch::channel(Self::prompt_capabilities(model.as_deref()));
 811
 812        Self {
 813            id,
 814            prompt_id: PromptId::new(),
 815            title: if db_thread.title.is_empty() {
 816                None
 817            } else {
 818                Some(db_thread.title.clone())
 819            },
 820            pending_title_generation: None,
 821            summary: db_thread.detailed_summary,
 822            messages: db_thread.messages,
 823            completion_mode: db_thread.completion_mode.unwrap_or_default(),
 824            running_turn: None,
 825            pending_message: None,
 826            tools: BTreeMap::default(),
 827            tool_use_limit_reached: false,
 828            request_token_usage: db_thread.request_token_usage.clone(),
 829            cumulative_token_usage: db_thread.cumulative_token_usage,
 830            initial_project_snapshot: Task::ready(db_thread.initial_project_snapshot).shared(),
 831            context_server_registry,
 832            profile_id,
 833            project_context,
 834            templates,
 835            model,
 836            summarization_model: None,
 837            project,
 838            action_log,
 839            updated_at: db_thread.updated_at,
 840            prompt_capabilities_tx,
 841            prompt_capabilities_rx,
 842        }
 843    }
 844
 845    pub fn to_db(&self, cx: &App) -> Task<DbThread> {
 846        let initial_project_snapshot = self.initial_project_snapshot.clone();
 847        let mut thread = DbThread {
 848            title: self.title(),
 849            messages: self.messages.clone(),
 850            updated_at: self.updated_at,
 851            detailed_summary: self.summary.clone(),
 852            initial_project_snapshot: None,
 853            cumulative_token_usage: self.cumulative_token_usage,
 854            request_token_usage: self.request_token_usage.clone(),
 855            model: self.model.as_ref().map(|model| DbLanguageModel {
 856                provider: model.provider_id().to_string(),
 857                model: model.name().0.to_string(),
 858            }),
 859            completion_mode: Some(self.completion_mode),
 860            profile: Some(self.profile_id.clone()),
 861        };
 862
 863        cx.background_spawn(async move {
 864            let initial_project_snapshot = initial_project_snapshot.await;
 865            thread.initial_project_snapshot = initial_project_snapshot;
 866            thread
 867        })
 868    }
 869
 870    /// Create a snapshot of the current project state including git information and unsaved buffers.
 871    fn project_snapshot(
 872        project: Entity<Project>,
 873        cx: &mut Context<Self>,
 874    ) -> Task<Arc<agent::thread::ProjectSnapshot>> {
 875        let git_store = project.read(cx).git_store().clone();
 876        let worktree_snapshots: Vec<_> = project
 877            .read(cx)
 878            .visible_worktrees(cx)
 879            .map(|worktree| Self::worktree_snapshot(worktree, git_store.clone(), cx))
 880            .collect();
 881
 882        cx.spawn(async move |_, _| {
 883            let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
 884
 885            Arc::new(ProjectSnapshot {
 886                worktree_snapshots,
 887                timestamp: Utc::now(),
 888            })
 889        })
 890    }
 891
 892    fn worktree_snapshot(
 893        worktree: Entity<project::Worktree>,
 894        git_store: Entity<GitStore>,
 895        cx: &App,
 896    ) -> Task<agent::thread::WorktreeSnapshot> {
 897        cx.spawn(async move |cx| {
 898            // Get worktree path and snapshot
 899            let worktree_info = cx.update(|app_cx| {
 900                let worktree = worktree.read(app_cx);
 901                let path = worktree.abs_path().to_string_lossy().into_owned();
 902                let snapshot = worktree.snapshot();
 903                (path, snapshot)
 904            });
 905
 906            let Ok((worktree_path, _snapshot)) = worktree_info else {
 907                return WorktreeSnapshot {
 908                    worktree_path: String::new(),
 909                    git_state: None,
 910                };
 911            };
 912
 913            let git_state = git_store
 914                .update(cx, |git_store, cx| {
 915                    git_store
 916                        .repositories()
 917                        .values()
 918                        .find(|repo| {
 919                            repo.read(cx)
 920                                .abs_path_to_repo_path(&worktree.read(cx).abs_path())
 921                                .is_some()
 922                        })
 923                        .cloned()
 924                })
 925                .ok()
 926                .flatten()
 927                .map(|repo| {
 928                    repo.update(cx, |repo, _| {
 929                        let current_branch =
 930                            repo.branch.as_ref().map(|branch| branch.name().to_owned());
 931                        repo.send_job(None, |state, _| async move {
 932                            let RepositoryState::Local { backend, .. } = state else {
 933                                return GitState {
 934                                    remote_url: None,
 935                                    head_sha: None,
 936                                    current_branch,
 937                                    diff: None,
 938                                };
 939                            };
 940
 941                            let remote_url = backend.remote_url("origin");
 942                            let head_sha = backend.head_sha().await;
 943                            let diff = backend.diff(DiffType::HeadToWorktree).await.ok();
 944
 945                            GitState {
 946                                remote_url,
 947                                head_sha,
 948                                current_branch,
 949                                diff,
 950                            }
 951                        })
 952                    })
 953                });
 954
 955            let git_state = match git_state {
 956                Some(git_state) => match git_state.ok() {
 957                    Some(git_state) => git_state.await.ok(),
 958                    None => None,
 959                },
 960                None => None,
 961            };
 962
 963            WorktreeSnapshot {
 964                worktree_path,
 965                git_state,
 966            }
 967        })
 968    }
 969
 970    pub fn project_context(&self) -> &Entity<ProjectContext> {
 971        &self.project_context
 972    }
 973
 974    pub fn project(&self) -> &Entity<Project> {
 975        &self.project
 976    }
 977
 978    pub fn action_log(&self) -> &Entity<ActionLog> {
 979        &self.action_log
 980    }
 981
 982    pub fn is_empty(&self) -> bool {
 983        self.messages.is_empty() && self.title.is_none()
 984    }
 985
 986    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
 987        self.model.as_ref()
 988    }
 989
 990    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
 991        let old_usage = self.latest_token_usage();
 992        self.model = Some(model);
 993        let new_caps = Self::prompt_capabilities(self.model.as_deref());
 994        let new_usage = self.latest_token_usage();
 995        if old_usage != new_usage {
 996            cx.emit(TokenUsageUpdated(new_usage));
 997        }
 998        self.prompt_capabilities_tx.send(new_caps).log_err();
 999        cx.notify()
1000    }
1001
1002    pub fn summarization_model(&self) -> Option<&Arc<dyn LanguageModel>> {
1003        self.summarization_model.as_ref()
1004    }
1005
1006    pub fn set_summarization_model(
1007        &mut self,
1008        model: Option<Arc<dyn LanguageModel>>,
1009        cx: &mut Context<Self>,
1010    ) {
1011        self.summarization_model = model;
1012        cx.notify()
1013    }
1014
1015    pub fn completion_mode(&self) -> CompletionMode {
1016        self.completion_mode
1017    }
1018
1019    pub fn set_completion_mode(&mut self, mode: CompletionMode, cx: &mut Context<Self>) {
1020        let old_usage = self.latest_token_usage();
1021        self.completion_mode = mode;
1022        let new_usage = self.latest_token_usage();
1023        if old_usage != new_usage {
1024            cx.emit(TokenUsageUpdated(new_usage));
1025        }
1026        cx.notify()
1027    }
1028
1029    #[cfg(any(test, feature = "test-support"))]
1030    pub fn last_message(&self) -> Option<Message> {
1031        if let Some(message) = self.pending_message.clone() {
1032            Some(Message::Agent(message))
1033        } else {
1034            self.messages.last().cloned()
1035        }
1036    }
1037
1038    pub fn add_default_tools(
1039        &mut self,
1040        environment: Rc<dyn ThreadEnvironment>,
1041        cx: &mut Context<Self>,
1042    ) {
1043        let language_registry = self.project.read(cx).languages().clone();
1044        self.add_tool(CopyPathTool::new(self.project.clone()));
1045        self.add_tool(CreateDirectoryTool::new(self.project.clone()));
1046        self.add_tool(DeletePathTool::new(
1047            self.project.clone(),
1048            self.action_log.clone(),
1049        ));
1050        self.add_tool(DiagnosticsTool::new(self.project.clone()));
1051        self.add_tool(EditFileTool::new(
1052            self.project.clone(),
1053            cx.weak_entity(),
1054            language_registry,
1055        ));
1056        self.add_tool(FetchTool::new(self.project.read(cx).client().http_client()));
1057        self.add_tool(FindPathTool::new(self.project.clone()));
1058        self.add_tool(GrepTool::new(self.project.clone()));
1059        self.add_tool(ListDirectoryTool::new(self.project.clone()));
1060        self.add_tool(MovePathTool::new(self.project.clone()));
1061        self.add_tool(NowTool);
1062        self.add_tool(OpenTool::new(self.project.clone()));
1063        self.add_tool(ReadFileTool::new(
1064            self.project.clone(),
1065            self.action_log.clone(),
1066        ));
1067        self.add_tool(TerminalTool::new(self.project.clone(), environment));
1068        self.add_tool(ThinkingTool);
1069        self.add_tool(WebSearchTool);
1070    }
1071
1072    pub fn add_tool<T: AgentTool>(&mut self, tool: T) {
1073        self.tools.insert(T::name().into(), tool.erase());
1074    }
1075
1076    pub fn remove_tool(&mut self, name: &str) -> bool {
1077        self.tools.remove(name).is_some()
1078    }
1079
1080    pub fn profile(&self) -> &AgentProfileId {
1081        &self.profile_id
1082    }
1083
1084    pub fn set_profile(&mut self, profile_id: AgentProfileId) {
1085        self.profile_id = profile_id;
1086    }
1087
1088    pub fn cancel(&mut self, cx: &mut Context<Self>) {
1089        if let Some(running_turn) = self.running_turn.take() {
1090            running_turn.cancel();
1091        }
1092        self.flush_pending_message(cx);
1093    }
1094
1095    fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
1096        let Some(last_user_message) = self.last_user_message() else {
1097            return;
1098        };
1099
1100        self.request_token_usage
1101            .insert(last_user_message.id.clone(), update);
1102        cx.emit(TokenUsageUpdated(self.latest_token_usage()));
1103        cx.notify();
1104    }
1105
1106    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
1107        self.cancel(cx);
1108        let Some(position) = self.messages.iter().position(
1109            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
1110        ) else {
1111            return Err(anyhow!("Message not found"));
1112        };
1113
1114        for message in self.messages.drain(position..) {
1115            match message {
1116                Message::User(message) => {
1117                    self.request_token_usage.remove(&message.id);
1118                }
1119                Message::Agent(_) | Message::Resume => {}
1120            }
1121        }
1122        self.summary = None;
1123        cx.notify();
1124        Ok(())
1125    }
1126
1127    pub fn latest_token_usage(&self) -> Option<acp_thread::TokenUsage> {
1128        let last_user_message = self.last_user_message()?;
1129        let tokens = self.request_token_usage.get(&last_user_message.id)?;
1130        let model = self.model.clone()?;
1131
1132        Some(acp_thread::TokenUsage {
1133            max_tokens: model.max_token_count_for_mode(self.completion_mode.into()),
1134            used_tokens: tokens.total_tokens(),
1135        })
1136    }
1137
1138    pub fn resume(
1139        &mut self,
1140        cx: &mut Context<Self>,
1141    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1142        self.messages.push(Message::Resume);
1143        cx.notify();
1144
1145        log::debug!("Total messages in thread: {}", self.messages.len());
1146        self.run_turn(cx)
1147    }
1148
1149    /// Sending a message results in the model streaming a response, which could include tool calls.
1150    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
1151    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
1152    pub fn send<T>(
1153        &mut self,
1154        id: UserMessageId,
1155        content: impl IntoIterator<Item = T>,
1156        cx: &mut Context<Self>,
1157    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
1158    where
1159        T: Into<UserMessageContent>,
1160    {
1161        let model = self.model().context("No language model configured")?;
1162
1163        log::info!("Thread::send called with model: {}", model.name().0);
1164        self.advance_prompt_id();
1165
1166        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
1167        log::debug!("Thread::send content: {:?}", content);
1168
1169        self.messages
1170            .push(Message::User(UserMessage { id, content }));
1171        cx.notify();
1172
1173        log::debug!("Total messages in thread: {}", self.messages.len());
1174        self.run_turn(cx)
1175    }
1176
1177    fn run_turn(
1178        &mut self,
1179        cx: &mut Context<Self>,
1180    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
1181        self.cancel(cx);
1182
1183        let model = self.model.clone().context("No language model configured")?;
1184        let profile = AgentSettings::get_global(cx)
1185            .profiles
1186            .get(&self.profile_id)
1187            .context("Profile not found")?;
1188        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1189        let event_stream = ThreadEventStream(events_tx);
1190        let message_ix = self.messages.len().saturating_sub(1);
1191        self.tool_use_limit_reached = false;
1192        self.summary = None;
1193        self.running_turn = Some(RunningTurn {
1194            event_stream: event_stream.clone(),
1195            tools: self.enabled_tools(profile, &model, cx),
1196            _task: cx.spawn(async move |this, cx| {
1197                log::debug!("Starting agent turn execution");
1198
1199                let turn_result = Self::run_turn_internal(&this, model, &event_stream, cx).await;
1200                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
1201
1202                match turn_result {
1203                    Ok(()) => {
1204                        log::debug!("Turn execution completed");
1205                        event_stream.send_stop(acp::StopReason::EndTurn);
1206                    }
1207                    Err(error) => {
1208                        log::error!("Turn execution failed: {:?}", error);
1209                        match error.downcast::<CompletionError>() {
1210                            Ok(CompletionError::Refusal) => {
1211                                event_stream.send_stop(acp::StopReason::Refusal);
1212                                _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1213                            }
1214                            Ok(CompletionError::MaxTokens) => {
1215                                event_stream.send_stop(acp::StopReason::MaxTokens);
1216                            }
1217                            Ok(CompletionError::Other(error)) | Err(error) => {
1218                                event_stream.send_error(error);
1219                            }
1220                        }
1221                    }
1222                }
1223
1224                _ = this.update(cx, |this, _| this.running_turn.take());
1225            }),
1226        });
1227        Ok(events_rx)
1228    }
1229
1230    async fn run_turn_internal(
1231        this: &WeakEntity<Self>,
1232        model: Arc<dyn LanguageModel>,
1233        event_stream: &ThreadEventStream,
1234        cx: &mut AsyncApp,
1235    ) -> Result<()> {
1236        let mut attempt = 0;
1237        let mut intent = CompletionIntent::UserPrompt;
1238        loop {
1239            let request =
1240                this.update(cx, |this, cx| this.build_completion_request(intent, cx))??;
1241
1242            telemetry::event!(
1243                "Agent Thread Completion",
1244                thread_id = this.read_with(cx, |this, _| this.id.to_string())?,
1245                prompt_id = this.read_with(cx, |this, _| this.prompt_id.to_string())?,
1246                model = model.telemetry_id(),
1247                model_provider = model.provider_id().to_string(),
1248                attempt
1249            );
1250
1251            log::debug!("Calling model.stream_completion, attempt {}", attempt);
1252            let mut events = model
1253                .stream_completion(request, cx)
1254                .await
1255                .map_err(|error| anyhow!(error))?;
1256            let mut tool_results = FuturesUnordered::new();
1257            let mut error = None;
1258            while let Some(event) = events.next().await {
1259                log::trace!("Received completion event: {:?}", event);
1260                match event {
1261                    Ok(event) => {
1262                        tool_results.extend(this.update(cx, |this, cx| {
1263                            this.handle_completion_event(event, event_stream, cx)
1264                        })??);
1265                    }
1266                    Err(err) => {
1267                        error = Some(err);
1268                        break;
1269                    }
1270                }
1271            }
1272
1273            let end_turn = tool_results.is_empty();
1274            while let Some(tool_result) = tool_results.next().await {
1275                log::debug!("Tool finished {:?}", tool_result);
1276
1277                event_stream.update_tool_call_fields(
1278                    &tool_result.tool_use_id,
1279                    acp::ToolCallUpdateFields {
1280                        status: Some(if tool_result.is_error {
1281                            acp::ToolCallStatus::Failed
1282                        } else {
1283                            acp::ToolCallStatus::Completed
1284                        }),
1285                        raw_output: tool_result.output.clone(),
1286                        ..Default::default()
1287                    },
1288                );
1289                this.update(cx, |this, _cx| {
1290                    this.pending_message()
1291                        .tool_results
1292                        .insert(tool_result.tool_use_id.clone(), tool_result);
1293                })?;
1294            }
1295
1296            this.update(cx, |this, cx| {
1297                this.flush_pending_message(cx);
1298                if this.title.is_none() && this.pending_title_generation.is_none() {
1299                    this.generate_title(cx);
1300                }
1301            })?;
1302
1303            if let Some(error) = error {
1304                attempt += 1;
1305                let retry =
1306                    this.update(cx, |this, _| this.handle_completion_error(error, attempt))??;
1307                let timer = cx.background_executor().timer(retry.duration);
1308                event_stream.send_retry(retry);
1309                timer.await;
1310                this.update(cx, |this, _cx| {
1311                    if let Some(Message::Agent(message)) = this.messages.last() {
1312                        if message.tool_results.is_empty() {
1313                            intent = CompletionIntent::UserPrompt;
1314                            this.messages.push(Message::Resume);
1315                        }
1316                    }
1317                })?;
1318            } else if this.read_with(cx, |this, _| this.tool_use_limit_reached)? {
1319                return Err(language_model::ToolUseLimitReachedError.into());
1320            } else if end_turn {
1321                return Ok(());
1322            } else {
1323                intent = CompletionIntent::ToolResults;
1324                attempt = 0;
1325            }
1326        }
1327    }
1328
1329    fn handle_completion_error(
1330        &mut self,
1331        error: LanguageModelCompletionError,
1332        attempt: u8,
1333    ) -> Result<acp_thread::RetryStatus> {
1334        if self.completion_mode == CompletionMode::Normal {
1335            return Err(anyhow!(error));
1336        }
1337
1338        let Some(strategy) = Self::retry_strategy_for(&error) else {
1339            return Err(anyhow!(error));
1340        };
1341
1342        let max_attempts = match &strategy {
1343            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1344            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1345        };
1346
1347        if attempt > max_attempts {
1348            return Err(anyhow!(error));
1349        }
1350
1351        let delay = match &strategy {
1352            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1353                let delay_secs = initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1354                Duration::from_secs(delay_secs)
1355            }
1356            RetryStrategy::Fixed { delay, .. } => *delay,
1357        };
1358        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1359
1360        Ok(acp_thread::RetryStatus {
1361            last_error: error.to_string().into(),
1362            attempt: attempt as usize,
1363            max_attempts: max_attempts as usize,
1364            started_at: Instant::now(),
1365            duration: delay,
1366        })
1367    }
1368
1369    /// A helper method that's called on every streamed completion event.
1370    /// Returns an optional tool result task, which the main agentic loop will
1371    /// send back to the model when it resolves.
1372    fn handle_completion_event(
1373        &mut self,
1374        event: LanguageModelCompletionEvent,
1375        event_stream: &ThreadEventStream,
1376        cx: &mut Context<Self>,
1377    ) -> Result<Option<Task<LanguageModelToolResult>>> {
1378        log::trace!("Handling streamed completion event: {:?}", event);
1379        use LanguageModelCompletionEvent::*;
1380
1381        match event {
1382            StartMessage { .. } => {
1383                self.flush_pending_message(cx);
1384                self.pending_message = Some(AgentMessage::default());
1385            }
1386            Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
1387            Thinking { text, signature } => {
1388                self.handle_thinking_event(text, signature, event_stream, cx)
1389            }
1390            RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
1391            ToolUse(tool_use) => {
1392                return Ok(self.handle_tool_use_event(tool_use, event_stream, cx));
1393            }
1394            ToolUseJsonParseError {
1395                id,
1396                tool_name,
1397                raw_input,
1398                json_parse_error,
1399            } => {
1400                return Ok(Some(Task::ready(
1401                    self.handle_tool_use_json_parse_error_event(
1402                        id,
1403                        tool_name,
1404                        raw_input,
1405                        json_parse_error,
1406                    ),
1407                )));
1408            }
1409            UsageUpdate(usage) => {
1410                telemetry::event!(
1411                    "Agent Thread Completion Usage Updated",
1412                    thread_id = self.id.to_string(),
1413                    prompt_id = self.prompt_id.to_string(),
1414                    model = self.model.as_ref().map(|m| m.telemetry_id()),
1415                    model_provider = self.model.as_ref().map(|m| m.provider_id().to_string()),
1416                    input_tokens = usage.input_tokens,
1417                    output_tokens = usage.output_tokens,
1418                    cache_creation_input_tokens = usage.cache_creation_input_tokens,
1419                    cache_read_input_tokens = usage.cache_read_input_tokens,
1420                );
1421                self.update_token_usage(usage, cx);
1422            }
1423            StatusUpdate(CompletionRequestStatus::UsageUpdated { amount, limit }) => {
1424                self.update_model_request_usage(amount, limit, cx);
1425            }
1426            StatusUpdate(
1427                CompletionRequestStatus::Started
1428                | CompletionRequestStatus::Queued { .. }
1429                | CompletionRequestStatus::Failed { .. },
1430            ) => {}
1431            StatusUpdate(CompletionRequestStatus::ToolUseLimitReached) => {
1432                self.tool_use_limit_reached = true;
1433            }
1434            Stop(StopReason::Refusal) => return Err(CompletionError::Refusal.into()),
1435            Stop(StopReason::MaxTokens) => return Err(CompletionError::MaxTokens.into()),
1436            Stop(StopReason::ToolUse | StopReason::EndTurn) => {}
1437        }
1438
1439        Ok(None)
1440    }
1441
1442    fn handle_text_event(
1443        &mut self,
1444        new_text: String,
1445        event_stream: &ThreadEventStream,
1446        cx: &mut Context<Self>,
1447    ) {
1448        event_stream.send_text(&new_text);
1449
1450        let last_message = self.pending_message();
1451        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1452            text.push_str(&new_text);
1453        } else {
1454            last_message
1455                .content
1456                .push(AgentMessageContent::Text(new_text));
1457        }
1458
1459        cx.notify();
1460    }
1461
1462    fn handle_thinking_event(
1463        &mut self,
1464        new_text: String,
1465        new_signature: Option<String>,
1466        event_stream: &ThreadEventStream,
1467        cx: &mut Context<Self>,
1468    ) {
1469        event_stream.send_thinking(&new_text);
1470
1471        let last_message = self.pending_message();
1472        if let Some(AgentMessageContent::Thinking { text, signature }) =
1473            last_message.content.last_mut()
1474        {
1475            text.push_str(&new_text);
1476            *signature = new_signature.or(signature.take());
1477        } else {
1478            last_message.content.push(AgentMessageContent::Thinking {
1479                text: new_text,
1480                signature: new_signature,
1481            });
1482        }
1483
1484        cx.notify();
1485    }
1486
1487    fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
1488        let last_message = self.pending_message();
1489        last_message
1490            .content
1491            .push(AgentMessageContent::RedactedThinking(data));
1492        cx.notify();
1493    }
1494
1495    fn handle_tool_use_event(
1496        &mut self,
1497        tool_use: LanguageModelToolUse,
1498        event_stream: &ThreadEventStream,
1499        cx: &mut Context<Self>,
1500    ) -> Option<Task<LanguageModelToolResult>> {
1501        cx.notify();
1502
1503        let tool = self.tool(tool_use.name.as_ref());
1504        let mut title = SharedString::from(&tool_use.name);
1505        let mut kind = acp::ToolKind::Other;
1506        if let Some(tool) = tool.as_ref() {
1507            title = tool.initial_title(tool_use.input.clone(), cx);
1508            kind = tool.kind();
1509        }
1510
1511        // Ensure the last message ends in the current tool use
1512        let last_message = self.pending_message();
1513        let push_new_tool_use = last_message.content.last_mut().is_none_or(|content| {
1514            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1515                if last_tool_use.id == tool_use.id {
1516                    *last_tool_use = tool_use.clone();
1517                    false
1518                } else {
1519                    true
1520                }
1521            } else {
1522                true
1523            }
1524        });
1525
1526        if push_new_tool_use {
1527            event_stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
1528            last_message
1529                .content
1530                .push(AgentMessageContent::ToolUse(tool_use.clone()));
1531        } else {
1532            event_stream.update_tool_call_fields(
1533                &tool_use.id,
1534                acp::ToolCallUpdateFields {
1535                    title: Some(title.into()),
1536                    kind: Some(kind),
1537                    raw_input: Some(tool_use.input.clone()),
1538                    ..Default::default()
1539                },
1540            );
1541        }
1542
1543        if !tool_use.is_input_complete {
1544            return None;
1545        }
1546
1547        let Some(tool) = tool else {
1548            let content = format!("No tool named {} exists", tool_use.name);
1549            return Some(Task::ready(LanguageModelToolResult {
1550                content: LanguageModelToolResultContent::Text(Arc::from(content)),
1551                tool_use_id: tool_use.id,
1552                tool_name: tool_use.name,
1553                is_error: true,
1554                output: None,
1555            }));
1556        };
1557
1558        let fs = self.project.read(cx).fs().clone();
1559        let tool_event_stream =
1560            ToolCallEventStream::new(tool_use.id.clone(), event_stream.clone(), Some(fs));
1561        tool_event_stream.update_fields(acp::ToolCallUpdateFields {
1562            status: Some(acp::ToolCallStatus::InProgress),
1563            ..Default::default()
1564        });
1565        let supports_images = self.model().is_some_and(|model| model.supports_images());
1566        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
1567        log::debug!("Running tool {}", tool_use.name);
1568        Some(cx.foreground_executor().spawn(async move {
1569            let tool_result = tool_result.await.and_then(|output| {
1570                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
1571                    && !supports_images
1572                {
1573                    return Err(anyhow!(
1574                        "Attempted to read an image, but this model doesn't support it.",
1575                    ));
1576                }
1577                Ok(output)
1578            });
1579
1580            match tool_result {
1581                Ok(output) => LanguageModelToolResult {
1582                    tool_use_id: tool_use.id,
1583                    tool_name: tool_use.name,
1584                    is_error: false,
1585                    content: output.llm_output,
1586                    output: Some(output.raw_output),
1587                },
1588                Err(error) => LanguageModelToolResult {
1589                    tool_use_id: tool_use.id,
1590                    tool_name: tool_use.name,
1591                    is_error: true,
1592                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
1593                    output: Some(error.to_string().into()),
1594                },
1595            }
1596        }))
1597    }
1598
1599    fn handle_tool_use_json_parse_error_event(
1600        &mut self,
1601        tool_use_id: LanguageModelToolUseId,
1602        tool_name: Arc<str>,
1603        raw_input: Arc<str>,
1604        json_parse_error: String,
1605    ) -> LanguageModelToolResult {
1606        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
1607        LanguageModelToolResult {
1608            tool_use_id,
1609            tool_name,
1610            is_error: true,
1611            content: LanguageModelToolResultContent::Text(tool_output.into()),
1612            output: Some(serde_json::Value::String(raw_input.to_string())),
1613        }
1614    }
1615
1616    fn update_model_request_usage(&self, amount: usize, limit: UsageLimit, cx: &mut Context<Self>) {
1617        self.project
1618            .read(cx)
1619            .user_store()
1620            .update(cx, |user_store, cx| {
1621                user_store.update_model_request_usage(
1622                    ModelRequestUsage(RequestUsage {
1623                        amount: amount as i32,
1624                        limit,
1625                    }),
1626                    cx,
1627                )
1628            });
1629    }
1630
1631    pub fn title(&self) -> SharedString {
1632        self.title.clone().unwrap_or("New Thread".into())
1633    }
1634
1635    pub fn summary(&mut self, cx: &mut Context<Self>) -> Task<Result<SharedString>> {
1636        if let Some(summary) = self.summary.as_ref() {
1637            return Task::ready(Ok(summary.clone()));
1638        }
1639        let Some(model) = self.summarization_model.clone() else {
1640            return Task::ready(Err(anyhow!("No summarization model available")));
1641        };
1642        let mut request = LanguageModelRequest {
1643            intent: Some(CompletionIntent::ThreadContextSummarization),
1644            temperature: AgentSettings::temperature_for_model(&model, cx),
1645            ..Default::default()
1646        };
1647
1648        for message in &self.messages {
1649            request.messages.extend(message.to_request());
1650        }
1651
1652        request.messages.push(LanguageModelRequestMessage {
1653            role: Role::User,
1654            content: vec![SUMMARIZE_THREAD_DETAILED_PROMPT.into()],
1655            cache: false,
1656        });
1657        cx.spawn(async move |this, cx| {
1658            let mut summary = String::new();
1659            let mut messages = model.stream_completion(request, cx).await?;
1660            while let Some(event) = messages.next().await {
1661                let event = event?;
1662                let text = match event {
1663                    LanguageModelCompletionEvent::Text(text) => text,
1664                    LanguageModelCompletionEvent::StatusUpdate(
1665                        CompletionRequestStatus::UsageUpdated { amount, limit },
1666                    ) => {
1667                        this.update(cx, |thread, cx| {
1668                            thread.update_model_request_usage(amount, limit, cx);
1669                        })?;
1670                        continue;
1671                    }
1672                    _ => continue,
1673                };
1674
1675                let mut lines = text.lines();
1676                summary.extend(lines.next());
1677            }
1678
1679            log::debug!("Setting summary: {}", summary);
1680            let summary = SharedString::from(summary);
1681
1682            this.update(cx, |this, cx| {
1683                this.summary = Some(summary.clone());
1684                cx.notify()
1685            })?;
1686
1687            Ok(summary)
1688        })
1689    }
1690
1691    fn generate_title(&mut self, cx: &mut Context<Self>) {
1692        let Some(model) = self.summarization_model.clone() else {
1693            return;
1694        };
1695
1696        log::debug!(
1697            "Generating title with model: {:?}",
1698            self.summarization_model.as_ref().map(|model| model.name())
1699        );
1700        let mut request = LanguageModelRequest {
1701            intent: Some(CompletionIntent::ThreadSummarization),
1702            temperature: AgentSettings::temperature_for_model(&model, cx),
1703            ..Default::default()
1704        };
1705
1706        for message in &self.messages {
1707            request.messages.extend(message.to_request());
1708        }
1709
1710        request.messages.push(LanguageModelRequestMessage {
1711            role: Role::User,
1712            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
1713            cache: false,
1714        });
1715        self.pending_title_generation = Some(cx.spawn(async move |this, cx| {
1716            let mut title = String::new();
1717
1718            let generate = async {
1719                let mut messages = model.stream_completion(request, cx).await?;
1720                while let Some(event) = messages.next().await {
1721                    let event = event?;
1722                    let text = match event {
1723                        LanguageModelCompletionEvent::Text(text) => text,
1724                        LanguageModelCompletionEvent::StatusUpdate(
1725                            CompletionRequestStatus::UsageUpdated { amount, limit },
1726                        ) => {
1727                            this.update(cx, |thread, cx| {
1728                                thread.update_model_request_usage(amount, limit, cx);
1729                            })?;
1730                            continue;
1731                        }
1732                        _ => continue,
1733                    };
1734
1735                    let mut lines = text.lines();
1736                    title.extend(lines.next());
1737
1738                    // Stop if the LLM generated multiple lines.
1739                    if lines.next().is_some() {
1740                        break;
1741                    }
1742                }
1743                anyhow::Ok(())
1744            };
1745
1746            if generate.await.context("failed to generate title").is_ok() {
1747                _ = this.update(cx, |this, cx| this.set_title(title.into(), cx));
1748            }
1749            _ = this.update(cx, |this, _| this.pending_title_generation = None);
1750        }));
1751    }
1752
1753    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1754        self.pending_title_generation = None;
1755        if Some(&title) != self.title.as_ref() {
1756            self.title = Some(title);
1757            cx.emit(TitleUpdated);
1758            cx.notify();
1759        }
1760    }
1761
1762    fn last_user_message(&self) -> Option<&UserMessage> {
1763        self.messages
1764            .iter()
1765            .rev()
1766            .find_map(|message| match message {
1767                Message::User(user_message) => Some(user_message),
1768                Message::Agent(_) => None,
1769                Message::Resume => None,
1770            })
1771    }
1772
1773    fn pending_message(&mut self) -> &mut AgentMessage {
1774        self.pending_message.get_or_insert_default()
1775    }
1776
1777    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
1778        let Some(mut message) = self.pending_message.take() else {
1779            return;
1780        };
1781
1782        if message.content.is_empty() {
1783            return;
1784        }
1785
1786        for content in &message.content {
1787            let AgentMessageContent::ToolUse(tool_use) = content else {
1788                continue;
1789            };
1790
1791            if !message.tool_results.contains_key(&tool_use.id) {
1792                message.tool_results.insert(
1793                    tool_use.id.clone(),
1794                    LanguageModelToolResult {
1795                        tool_use_id: tool_use.id.clone(),
1796                        tool_name: tool_use.name.clone(),
1797                        is_error: true,
1798                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
1799                        output: None,
1800                    },
1801                );
1802            }
1803        }
1804
1805        self.messages.push(Message::Agent(message));
1806        self.updated_at = Utc::now();
1807        self.summary = None;
1808        cx.notify()
1809    }
1810
1811    pub(crate) fn build_completion_request(
1812        &self,
1813        completion_intent: CompletionIntent,
1814        cx: &App,
1815    ) -> Result<LanguageModelRequest> {
1816        let model = self.model().context("No language model configured")?;
1817        let tools = if let Some(turn) = self.running_turn.as_ref() {
1818            turn.tools
1819                .iter()
1820                .filter_map(|(tool_name, tool)| {
1821                    log::trace!("Including tool: {}", tool_name);
1822                    Some(LanguageModelRequestTool {
1823                        name: tool_name.to_string(),
1824                        description: tool.description().to_string(),
1825                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
1826                    })
1827                })
1828                .collect::<Vec<_>>()
1829        } else {
1830            Vec::new()
1831        };
1832
1833        log::debug!("Building completion request");
1834        log::debug!("Completion intent: {:?}", completion_intent);
1835        log::debug!("Completion mode: {:?}", self.completion_mode);
1836
1837        let messages = self.build_request_messages(cx);
1838        log::debug!("Request will include {} messages", messages.len());
1839        log::debug!("Request includes {} tools", tools.len());
1840
1841        let request = LanguageModelRequest {
1842            thread_id: Some(self.id.to_string()),
1843            prompt_id: Some(self.prompt_id.to_string()),
1844            intent: Some(completion_intent),
1845            mode: Some(self.completion_mode.into()),
1846            messages,
1847            tools,
1848            tool_choice: None,
1849            stop: Vec::new(),
1850            temperature: AgentSettings::temperature_for_model(model, cx),
1851            thinking_allowed: true,
1852        };
1853
1854        log::debug!("Completion request built successfully");
1855        Ok(request)
1856    }
1857
1858    fn enabled_tools(
1859        &self,
1860        profile: &AgentProfileSettings,
1861        model: &Arc<dyn LanguageModel>,
1862        cx: &App,
1863    ) -> BTreeMap<SharedString, Arc<dyn AnyAgentTool>> {
1864        fn truncate(tool_name: &SharedString) -> SharedString {
1865            if tool_name.len() > MAX_TOOL_NAME_LENGTH {
1866                let mut truncated = tool_name.to_string();
1867                truncated.truncate(MAX_TOOL_NAME_LENGTH);
1868                truncated.into()
1869            } else {
1870                tool_name.clone()
1871            }
1872        }
1873
1874        let mut tools = self
1875            .tools
1876            .iter()
1877            .filter_map(|(tool_name, tool)| {
1878                if tool.supported_provider(&model.provider_id())
1879                    && profile.is_tool_enabled(tool_name)
1880                {
1881                    Some((truncate(tool_name), tool.clone()))
1882                } else {
1883                    None
1884                }
1885            })
1886            .collect::<BTreeMap<_, _>>();
1887
1888        let mut context_server_tools = Vec::new();
1889        let mut seen_tools = tools.keys().cloned().collect::<HashSet<_>>();
1890        let mut duplicate_tool_names = HashSet::default();
1891        for (server_id, server_tools) in self.context_server_registry.read(cx).servers() {
1892            for (tool_name, tool) in server_tools {
1893                if profile.is_context_server_tool_enabled(&server_id.0, &tool_name) {
1894                    let tool_name = truncate(tool_name);
1895                    if !seen_tools.insert(tool_name.clone()) {
1896                        duplicate_tool_names.insert(tool_name.clone());
1897                    }
1898                    context_server_tools.push((server_id.clone(), tool_name, tool.clone()));
1899                }
1900            }
1901        }
1902
1903        // When there are duplicate tool names, disambiguate by prefixing them
1904        // with the server ID. In the rare case there isn't enough space for the
1905        // disambiguated tool name, keep only the last tool with this name.
1906        for (server_id, tool_name, tool) in context_server_tools {
1907            if duplicate_tool_names.contains(&tool_name) {
1908                let available = MAX_TOOL_NAME_LENGTH.saturating_sub(tool_name.len());
1909                if available >= 2 {
1910                    let mut disambiguated = server_id.0.to_string();
1911                    disambiguated.truncate(available - 1);
1912                    disambiguated.push('_');
1913                    disambiguated.push_str(&tool_name);
1914                    tools.insert(disambiguated.into(), tool.clone());
1915                } else {
1916                    tools.insert(tool_name, tool.clone());
1917                }
1918            } else {
1919                tools.insert(tool_name, tool.clone());
1920            }
1921        }
1922
1923        tools
1924    }
1925
1926    fn tool(&self, name: &str) -> Option<Arc<dyn AnyAgentTool>> {
1927        self.running_turn.as_ref()?.tools.get(name).cloned()
1928    }
1929
1930    fn build_request_messages(&self, cx: &App) -> Vec<LanguageModelRequestMessage> {
1931        log::trace!(
1932            "Building request messages from {} thread messages",
1933            self.messages.len()
1934        );
1935
1936        let system_prompt = SystemPromptTemplate {
1937            project: self.project_context.read(cx),
1938            available_tools: self.tools.keys().cloned().collect(),
1939        }
1940        .render(&self.templates)
1941        .context("failed to build system prompt")
1942        .expect("Invalid template");
1943        let mut messages = vec![LanguageModelRequestMessage {
1944            role: Role::System,
1945            content: vec![system_prompt.into()],
1946            cache: false,
1947        }];
1948        for message in &self.messages {
1949            messages.extend(message.to_request());
1950        }
1951
1952        if let Some(last_message) = messages.last_mut() {
1953            last_message.cache = true;
1954        }
1955
1956        if let Some(message) = self.pending_message.as_ref() {
1957            messages.extend(message.to_request());
1958        }
1959
1960        messages
1961    }
1962
1963    pub fn to_markdown(&self) -> String {
1964        let mut markdown = String::new();
1965        for (ix, message) in self.messages.iter().enumerate() {
1966            if ix > 0 {
1967                markdown.push('\n');
1968            }
1969            markdown.push_str(&message.to_markdown());
1970        }
1971
1972        if let Some(message) = self.pending_message.as_ref() {
1973            markdown.push('\n');
1974            markdown.push_str(&message.to_markdown());
1975        }
1976
1977        markdown
1978    }
1979
1980    fn advance_prompt_id(&mut self) {
1981        self.prompt_id = PromptId::new();
1982    }
1983
1984    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
1985        use LanguageModelCompletionError::*;
1986        use http_client::StatusCode;
1987
1988        // General strategy here:
1989        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
1990        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
1991        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
1992        match error {
1993            HttpResponseError {
1994                status_code: StatusCode::TOO_MANY_REQUESTS,
1995                ..
1996            } => Some(RetryStrategy::ExponentialBackoff {
1997                initial_delay: BASE_RETRY_DELAY,
1998                max_attempts: MAX_RETRY_ATTEMPTS,
1999            }),
2000            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
2001                Some(RetryStrategy::Fixed {
2002                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2003                    max_attempts: MAX_RETRY_ATTEMPTS,
2004                })
2005            }
2006            UpstreamProviderError {
2007                status,
2008                retry_after,
2009                ..
2010            } => match *status {
2011                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
2012                    Some(RetryStrategy::Fixed {
2013                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2014                        max_attempts: MAX_RETRY_ATTEMPTS,
2015                    })
2016                }
2017                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
2018                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2019                    // Internal Server Error could be anything, retry up to 3 times.
2020                    max_attempts: 3,
2021                }),
2022                status => {
2023                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
2024                    // but we frequently get them in practice. See https://http.dev/529
2025                    if status.as_u16() == 529 {
2026                        Some(RetryStrategy::Fixed {
2027                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2028                            max_attempts: MAX_RETRY_ATTEMPTS,
2029                        })
2030                    } else {
2031                        Some(RetryStrategy::Fixed {
2032                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
2033                            max_attempts: 2,
2034                        })
2035                    }
2036                }
2037            },
2038            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
2039                delay: BASE_RETRY_DELAY,
2040                max_attempts: 3,
2041            }),
2042            ApiReadResponseError { .. }
2043            | HttpSend { .. }
2044            | DeserializeResponse { .. }
2045            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
2046                delay: BASE_RETRY_DELAY,
2047                max_attempts: 3,
2048            }),
2049            // Retrying these errors definitely shouldn't help.
2050            HttpResponseError {
2051                status_code:
2052                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
2053                ..
2054            }
2055            | AuthenticationError { .. }
2056            | PermissionError { .. }
2057            | NoApiKey { .. }
2058            | ApiEndpointNotFound { .. }
2059            | PromptTooLarge { .. } => None,
2060            // These errors might be transient, so retry them
2061            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
2062                delay: BASE_RETRY_DELAY,
2063                max_attempts: 1,
2064            }),
2065            // Retry all other 4xx and 5xx errors once.
2066            HttpResponseError { status_code, .. }
2067                if status_code.is_client_error() || status_code.is_server_error() =>
2068            {
2069                Some(RetryStrategy::Fixed {
2070                    delay: BASE_RETRY_DELAY,
2071                    max_attempts: 3,
2072                })
2073            }
2074            Other(err)
2075                if err.is::<language_model::PaymentRequiredError>()
2076                    || err.is::<language_model::ModelRequestLimitReachedError>() =>
2077            {
2078                // Retrying won't help for Payment Required or Model Request Limit errors (where
2079                // the user must upgrade to usage-based billing to get more requests, or else wait
2080                // for a significant amount of time for the request limit to reset).
2081                None
2082            }
2083            // Conservatively assume that any other errors are non-retryable
2084            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
2085                delay: BASE_RETRY_DELAY,
2086                max_attempts: 2,
2087            }),
2088        }
2089    }
2090}
2091
2092struct RunningTurn {
2093    /// Holds the task that handles agent interaction until the end of the turn.
2094    /// Survives across multiple requests as the model performs tool calls and
2095    /// we run tools, report their results.
2096    _task: Task<()>,
2097    /// The current event stream for the running turn. Used to report a final
2098    /// cancellation event if we cancel the turn.
2099    event_stream: ThreadEventStream,
2100    /// The tools that were enabled for this turn.
2101    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
2102}
2103
2104impl RunningTurn {
2105    fn cancel(self) {
2106        log::debug!("Cancelling in progress turn");
2107        self.event_stream.send_canceled();
2108    }
2109}
2110
2111pub struct TokenUsageUpdated(pub Option<acp_thread::TokenUsage>);
2112
2113impl EventEmitter<TokenUsageUpdated> for Thread {}
2114
2115pub struct TitleUpdated;
2116
2117impl EventEmitter<TitleUpdated> for Thread {}
2118
2119pub trait AgentTool
2120where
2121    Self: 'static + Sized,
2122{
2123    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
2124    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
2125
2126    fn name() -> &'static str;
2127
2128    fn description(&self) -> SharedString {
2129        let schema = schemars::schema_for!(Self::Input);
2130        SharedString::new(
2131            schema
2132                .get("description")
2133                .and_then(|description| description.as_str())
2134                .unwrap_or_default(),
2135        )
2136    }
2137
2138    fn kind() -> acp::ToolKind;
2139
2140    /// The initial tool title to display. Can be updated during the tool run.
2141    fn initial_title(
2142        &self,
2143        input: Result<Self::Input, serde_json::Value>,
2144        cx: &mut App,
2145    ) -> SharedString;
2146
2147    /// Returns the JSON schema that describes the tool's input.
2148    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Schema {
2149        crate::tool_schema::root_schema_for::<Self::Input>(format)
2150    }
2151
2152    /// Some tools rely on a provider for the underlying billing or other reasons.
2153    /// Allow the tool to check if they are compatible, or should be filtered out.
2154    fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2155        true
2156    }
2157
2158    /// Runs the tool with the provided input.
2159    fn run(
2160        self: Arc<Self>,
2161        input: Self::Input,
2162        event_stream: ToolCallEventStream,
2163        cx: &mut App,
2164    ) -> Task<Result<Self::Output>>;
2165
2166    /// Emits events for a previous execution of the tool.
2167    fn replay(
2168        &self,
2169        _input: Self::Input,
2170        _output: Self::Output,
2171        _event_stream: ToolCallEventStream,
2172        _cx: &mut App,
2173    ) -> Result<()> {
2174        Ok(())
2175    }
2176
2177    fn erase(self) -> Arc<dyn AnyAgentTool> {
2178        Arc::new(Erased(Arc::new(self)))
2179    }
2180}
2181
2182pub struct Erased<T>(T);
2183
2184pub struct AgentToolOutput {
2185    pub llm_output: LanguageModelToolResultContent,
2186    pub raw_output: serde_json::Value,
2187}
2188
2189pub trait AnyAgentTool {
2190    fn name(&self) -> SharedString;
2191    fn description(&self) -> SharedString;
2192    fn kind(&self) -> acp::ToolKind;
2193    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString;
2194    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
2195    fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
2196        true
2197    }
2198    fn run(
2199        self: Arc<Self>,
2200        input: serde_json::Value,
2201        event_stream: ToolCallEventStream,
2202        cx: &mut App,
2203    ) -> Task<Result<AgentToolOutput>>;
2204    fn replay(
2205        &self,
2206        input: serde_json::Value,
2207        output: serde_json::Value,
2208        event_stream: ToolCallEventStream,
2209        cx: &mut App,
2210    ) -> Result<()>;
2211}
2212
2213impl<T> AnyAgentTool for Erased<Arc<T>>
2214where
2215    T: AgentTool,
2216{
2217    fn name(&self) -> SharedString {
2218        T::name().into()
2219    }
2220
2221    fn description(&self) -> SharedString {
2222        self.0.description()
2223    }
2224
2225    fn kind(&self) -> agent_client_protocol::ToolKind {
2226        T::kind()
2227    }
2228
2229    fn initial_title(&self, input: serde_json::Value, _cx: &mut App) -> SharedString {
2230        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
2231        self.0.initial_title(parsed_input, _cx)
2232    }
2233
2234    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
2235        let mut json = serde_json::to_value(self.0.input_schema(format))?;
2236        adapt_schema_to_format(&mut json, format)?;
2237        Ok(json)
2238    }
2239
2240    fn supported_provider(&self, provider: &LanguageModelProviderId) -> bool {
2241        self.0.supported_provider(provider)
2242    }
2243
2244    fn run(
2245        self: Arc<Self>,
2246        input: serde_json::Value,
2247        event_stream: ToolCallEventStream,
2248        cx: &mut App,
2249    ) -> Task<Result<AgentToolOutput>> {
2250        cx.spawn(async move |cx| {
2251            let input = serde_json::from_value(input)?;
2252            let output = cx
2253                .update(|cx| self.0.clone().run(input, event_stream, cx))?
2254                .await?;
2255            let raw_output = serde_json::to_value(&output)?;
2256            Ok(AgentToolOutput {
2257                llm_output: output.into(),
2258                raw_output,
2259            })
2260        })
2261    }
2262
2263    fn replay(
2264        &self,
2265        input: serde_json::Value,
2266        output: serde_json::Value,
2267        event_stream: ToolCallEventStream,
2268        cx: &mut App,
2269    ) -> Result<()> {
2270        let input = serde_json::from_value(input)?;
2271        let output = serde_json::from_value(output)?;
2272        self.0.replay(input, output, event_stream, cx)
2273    }
2274}
2275
2276#[derive(Clone)]
2277struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
2278
2279impl ThreadEventStream {
2280    fn send_user_message(&self, message: &UserMessage) {
2281        self.0
2282            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
2283            .ok();
2284    }
2285
2286    fn send_text(&self, text: &str) {
2287        self.0
2288            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
2289            .ok();
2290    }
2291
2292    fn send_thinking(&self, text: &str) {
2293        self.0
2294            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
2295            .ok();
2296    }
2297
2298    fn send_tool_call(
2299        &self,
2300        id: &LanguageModelToolUseId,
2301        title: SharedString,
2302        kind: acp::ToolKind,
2303        input: serde_json::Value,
2304    ) {
2305        self.0
2306            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
2307                id,
2308                title.to_string(),
2309                kind,
2310                input,
2311            ))))
2312            .ok();
2313    }
2314
2315    fn initial_tool_call(
2316        id: &LanguageModelToolUseId,
2317        title: String,
2318        kind: acp::ToolKind,
2319        input: serde_json::Value,
2320    ) -> acp::ToolCall {
2321        acp::ToolCall {
2322            meta: None,
2323            id: acp::ToolCallId(id.to_string().into()),
2324            title,
2325            kind,
2326            status: acp::ToolCallStatus::Pending,
2327            content: vec![],
2328            locations: vec![],
2329            raw_input: Some(input),
2330            raw_output: None,
2331        }
2332    }
2333
2334    fn update_tool_call_fields(
2335        &self,
2336        tool_use_id: &LanguageModelToolUseId,
2337        fields: acp::ToolCallUpdateFields,
2338    ) {
2339        self.0
2340            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2341                acp::ToolCallUpdate {
2342                    meta: None,
2343                    id: acp::ToolCallId(tool_use_id.to_string().into()),
2344                    fields,
2345                }
2346                .into(),
2347            )))
2348            .ok();
2349    }
2350
2351    fn send_retry(&self, status: acp_thread::RetryStatus) {
2352        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
2353    }
2354
2355    fn send_stop(&self, reason: acp::StopReason) {
2356        self.0.unbounded_send(Ok(ThreadEvent::Stop(reason))).ok();
2357    }
2358
2359    fn send_canceled(&self) {
2360        self.0
2361            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2362            .ok();
2363    }
2364
2365    fn send_error(&self, error: impl Into<anyhow::Error>) {
2366        self.0.unbounded_send(Err(error.into())).ok();
2367    }
2368}
2369
2370#[derive(Clone)]
2371pub struct ToolCallEventStream {
2372    tool_use_id: LanguageModelToolUseId,
2373    stream: ThreadEventStream,
2374    fs: Option<Arc<dyn Fs>>,
2375}
2376
2377impl ToolCallEventStream {
2378    #[cfg(test)]
2379    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
2380        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
2381
2382        let stream = ToolCallEventStream::new("test_id".into(), ThreadEventStream(events_tx), None);
2383
2384        (stream, ToolCallEventStreamReceiver(events_rx))
2385    }
2386
2387    fn new(
2388        tool_use_id: LanguageModelToolUseId,
2389        stream: ThreadEventStream,
2390        fs: Option<Arc<dyn Fs>>,
2391    ) -> Self {
2392        Self {
2393            tool_use_id,
2394            stream,
2395            fs,
2396        }
2397    }
2398
2399    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
2400        self.stream
2401            .update_tool_call_fields(&self.tool_use_id, fields);
2402    }
2403
2404    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
2405        self.stream
2406            .0
2407            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2408                acp_thread::ToolCallUpdateDiff {
2409                    id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2410                    diff,
2411                }
2412                .into(),
2413            )))
2414            .ok();
2415    }
2416
2417    pub fn authorize(&self, title: impl Into<String>, cx: &mut App) -> Task<Result<()>> {
2418        if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions {
2419            return Task::ready(Ok(()));
2420        }
2421
2422        let (response_tx, response_rx) = oneshot::channel();
2423        self.stream
2424            .0
2425            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
2426                ToolCallAuthorization {
2427                    tool_call: acp::ToolCallUpdate {
2428                        meta: None,
2429                        id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2430                        fields: acp::ToolCallUpdateFields {
2431                            title: Some(title.into()),
2432                            ..Default::default()
2433                        },
2434                    },
2435                    options: vec![
2436                        acp::PermissionOption {
2437                            id: acp::PermissionOptionId("always_allow".into()),
2438                            name: "Always Allow".into(),
2439                            kind: acp::PermissionOptionKind::AllowAlways,
2440                            meta: None,
2441                        },
2442                        acp::PermissionOption {
2443                            id: acp::PermissionOptionId("allow".into()),
2444                            name: "Allow".into(),
2445                            kind: acp::PermissionOptionKind::AllowOnce,
2446                            meta: None,
2447                        },
2448                        acp::PermissionOption {
2449                            id: acp::PermissionOptionId("deny".into()),
2450                            name: "Deny".into(),
2451                            kind: acp::PermissionOptionKind::RejectOnce,
2452                            meta: None,
2453                        },
2454                    ],
2455                    response: response_tx,
2456                },
2457            )))
2458            .ok();
2459        let fs = self.fs.clone();
2460        cx.spawn(async move |cx| match response_rx.await?.0.as_ref() {
2461            "always_allow" => {
2462                if let Some(fs) = fs.clone() {
2463                    cx.update(|cx| {
2464                        update_settings_file(fs, cx, |settings, _| {
2465                            settings
2466                                .agent
2467                                .get_or_insert_default()
2468                                .set_always_allow_tool_actions(true);
2469                        });
2470                    })?;
2471                }
2472
2473                Ok(())
2474            }
2475            "allow" => Ok(()),
2476            _ => Err(anyhow!("Permission to run tool denied by user")),
2477        })
2478    }
2479}
2480
2481#[cfg(test)]
2482pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
2483
2484#[cfg(test)]
2485impl ToolCallEventStreamReceiver {
2486    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
2487        let event = self.0.next().await;
2488        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
2489            auth
2490        } else {
2491            panic!("Expected ToolCallAuthorization but got: {:?}", event);
2492        }
2493    }
2494
2495    pub async fn expect_update_fields(&mut self) -> acp::ToolCallUpdateFields {
2496        let event = self.0.next().await;
2497        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2498            update,
2499        )))) = event
2500        {
2501            update.fields
2502        } else {
2503            panic!("Expected update fields but got: {:?}", event);
2504        }
2505    }
2506
2507    pub async fn expect_diff(&mut self) -> Entity<acp_thread::Diff> {
2508        let event = self.0.next().await;
2509        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateDiff(
2510            update,
2511        )))) = event
2512        {
2513            update.diff
2514        } else {
2515            panic!("Expected diff but got: {:?}", event);
2516        }
2517    }
2518
2519    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
2520        let event = self.0.next().await;
2521        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
2522            update,
2523        )))) = event
2524        {
2525            update.terminal
2526        } else {
2527            panic!("Expected terminal but got: {:?}", event);
2528        }
2529    }
2530}
2531
2532#[cfg(test)]
2533impl std::ops::Deref for ToolCallEventStreamReceiver {
2534    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
2535
2536    fn deref(&self) -> &Self::Target {
2537        &self.0
2538    }
2539}
2540
2541#[cfg(test)]
2542impl std::ops::DerefMut for ToolCallEventStreamReceiver {
2543    fn deref_mut(&mut self) -> &mut Self::Target {
2544        &mut self.0
2545    }
2546}
2547
2548impl From<&str> for UserMessageContent {
2549    fn from(text: &str) -> Self {
2550        Self::Text(text.into())
2551    }
2552}
2553
2554impl From<acp::ContentBlock> for UserMessageContent {
2555    fn from(value: acp::ContentBlock) -> Self {
2556        match value {
2557            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
2558            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
2559            acp::ContentBlock::Audio(_) => {
2560                // TODO
2561                Self::Text("[audio]".to_string())
2562            }
2563            acp::ContentBlock::ResourceLink(resource_link) => {
2564                match MentionUri::parse(&resource_link.uri) {
2565                    Ok(uri) => Self::Mention {
2566                        uri,
2567                        content: String::new(),
2568                    },
2569                    Err(err) => {
2570                        log::error!("Failed to parse mention link: {}", err);
2571                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
2572                    }
2573                }
2574            }
2575            acp::ContentBlock::Resource(resource) => match resource.resource {
2576                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
2577                    match MentionUri::parse(&resource.uri) {
2578                        Ok(uri) => Self::Mention {
2579                            uri,
2580                            content: resource.text,
2581                        },
2582                        Err(err) => {
2583                            log::error!("Failed to parse mention link: {}", err);
2584                            Self::Text(
2585                                MarkdownCodeBlock {
2586                                    tag: &resource.uri,
2587                                    text: &resource.text,
2588                                }
2589                                .to_string(),
2590                            )
2591                        }
2592                    }
2593                }
2594                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
2595                    // TODO
2596                    Self::Text("[blob]".to_string())
2597                }
2598            },
2599        }
2600    }
2601}
2602
2603impl From<UserMessageContent> for acp::ContentBlock {
2604    fn from(content: UserMessageContent) -> Self {
2605        match content {
2606            UserMessageContent::Text(text) => acp::ContentBlock::Text(acp::TextContent {
2607                text,
2608                annotations: None,
2609                meta: None,
2610            }),
2611            UserMessageContent::Image(image) => acp::ContentBlock::Image(acp::ImageContent {
2612                data: image.source.to_string(),
2613                mime_type: "image/png".to_string(),
2614                meta: None,
2615                annotations: None,
2616                uri: None,
2617            }),
2618            UserMessageContent::Mention { uri, content } => {
2619                acp::ContentBlock::Resource(acp::EmbeddedResource {
2620                    meta: None,
2621                    resource: acp::EmbeddedResourceResource::TextResourceContents(
2622                        acp::TextResourceContents {
2623                            meta: None,
2624                            mime_type: None,
2625                            text: content,
2626                            uri: uri.to_uri().to_string(),
2627                        },
2628                    ),
2629                    annotations: None,
2630                })
2631            }
2632        }
2633    }
2634}
2635
2636fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
2637    LanguageModelImage {
2638        source: image_content.data.into(),
2639        // TODO: make this optional?
2640        size: gpui::Size::new(0.into(), 0.into()),
2641    }
2642}