thread.rs

   1use crate::{ContextServerRegistry, SystemPromptTemplate, Template, Templates};
   2use acp_thread::{MentionUri, UserMessageId};
   3use action_log::ActionLog;
   4use agent::thread::{DetailedSummaryState, GitState, ProjectSnapshot, WorktreeSnapshot};
   5use agent_client_protocol as acp;
   6use agent_settings::{AgentProfileId, AgentSettings, CompletionMode, SUMMARIZE_THREAD_PROMPT};
   7use anyhow::{Context as _, Result, anyhow};
   8use assistant_tool::adapt_schema_to_format;
   9use chrono::{DateTime, Utc};
  10use cloud_llm_client::{CompletionIntent, CompletionRequestStatus};
  11use collections::IndexMap;
  12use fs::Fs;
  13use futures::{
  14    FutureExt,
  15    channel::{mpsc, oneshot},
  16    future::Shared,
  17    stream::FuturesUnordered,
  18};
  19use git::repository::DiffType;
  20use gpui::{App, AsyncApp, Context, Entity, SharedString, Task, WeakEntity};
  21use language_model::{
  22    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelImage,
  23    LanguageModelProviderId, LanguageModelRequest, LanguageModelRequestMessage,
  24    LanguageModelRequestTool, LanguageModelToolResult, LanguageModelToolResultContent,
  25    LanguageModelToolSchemaFormat, LanguageModelToolUse, LanguageModelToolUseId, Role, StopReason,
  26    TokenUsage,
  27};
  28use project::{
  29    Project,
  30    git_store::{GitStore, RepositoryState},
  31};
  32use prompt_store::ProjectContext;
  33use schemars::{JsonSchema, Schema};
  34use serde::{Deserialize, Serialize};
  35use settings::{Settings, update_settings_file};
  36use smol::stream::StreamExt;
  37use std::{
  38    collections::BTreeMap,
  39    path::Path,
  40    sync::Arc,
  41    time::{Duration, Instant},
  42};
  43use std::{fmt::Write, ops::Range};
  44use util::{ResultExt, markdown::MarkdownCodeBlock};
  45use uuid::Uuid;
  46
  47const TOOL_CANCELED_MESSAGE: &str = "Tool canceled by user";
  48
  49/// The ID of the user prompt that initiated a request.
  50///
  51/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
  52#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  53pub struct PromptId(Arc<str>);
  54
  55impl PromptId {
  56    pub fn new() -> Self {
  57        Self(Uuid::new_v4().to_string().into())
  58    }
  59}
  60
  61impl std::fmt::Display for PromptId {
  62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  63        write!(f, "{}", self.0)
  64    }
  65}
  66
  67pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 4;
  68pub(crate) const BASE_RETRY_DELAY: Duration = Duration::from_secs(5);
  69
  70#[derive(Debug, Clone)]
  71enum RetryStrategy {
  72    ExponentialBackoff {
  73        initial_delay: Duration,
  74        max_attempts: u8,
  75    },
  76    Fixed {
  77        delay: Duration,
  78        max_attempts: u8,
  79    },
  80}
  81
  82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  83pub enum Message {
  84    User(UserMessage),
  85    Agent(AgentMessage),
  86    Resume,
  87}
  88
  89impl Message {
  90    pub fn as_agent_message(&self) -> Option<&AgentMessage> {
  91        match self {
  92            Message::Agent(agent_message) => Some(agent_message),
  93            _ => None,
  94        }
  95    }
  96
  97    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
  98        match self {
  99            Message::User(message) => vec![message.to_request()],
 100            Message::Agent(message) => message.to_request(),
 101            Message::Resume => vec![LanguageModelRequestMessage {
 102                role: Role::User,
 103                content: vec!["Continue where you left off".into()],
 104                cache: false,
 105            }],
 106        }
 107    }
 108
 109    pub fn to_markdown(&self) -> String {
 110        match self {
 111            Message::User(message) => message.to_markdown(),
 112            Message::Agent(message) => message.to_markdown(),
 113            Message::Resume => "[resumed after tool use limit was reached]".into(),
 114        }
 115    }
 116
 117    pub fn role(&self) -> Role {
 118        match self {
 119            Message::User(_) | Message::Resume => Role::User,
 120            Message::Agent(_) => Role::Assistant,
 121        }
 122    }
 123}
 124
 125#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 126pub struct UserMessage {
 127    pub id: UserMessageId,
 128    pub content: Vec<UserMessageContent>,
 129}
 130
 131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 132pub enum UserMessageContent {
 133    Text(String),
 134    Mention { uri: MentionUri, content: String },
 135    Image(LanguageModelImage),
 136}
 137
 138impl UserMessage {
 139    pub fn to_markdown(&self) -> String {
 140        let mut markdown = String::from("## User\n\n");
 141
 142        for content in &self.content {
 143            match content {
 144                UserMessageContent::Text(text) => {
 145                    markdown.push_str(text);
 146                    markdown.push('\n');
 147                }
 148                UserMessageContent::Image(_) => {
 149                    markdown.push_str("<image />\n");
 150                }
 151                UserMessageContent::Mention { uri, content } => {
 152                    if !content.is_empty() {
 153                        let _ = write!(&mut markdown, "{}\n\n{}\n", uri.as_link(), content);
 154                    } else {
 155                        let _ = write!(&mut markdown, "{}\n", uri.as_link());
 156                    }
 157                }
 158            }
 159        }
 160
 161        markdown
 162    }
 163
 164    fn to_request(&self) -> LanguageModelRequestMessage {
 165        let mut message = LanguageModelRequestMessage {
 166            role: Role::User,
 167            content: Vec::with_capacity(self.content.len()),
 168            cache: false,
 169        };
 170
 171        const OPEN_CONTEXT: &str = "<context>\n\
 172            The following items were attached by the user. \
 173            They are up-to-date and don't need to be re-read.\n\n";
 174
 175        const OPEN_FILES_TAG: &str = "<files>";
 176        const OPEN_DIRECTORIES_TAG: &str = "<directories>";
 177        const OPEN_SYMBOLS_TAG: &str = "<symbols>";
 178        const OPEN_THREADS_TAG: &str = "<threads>";
 179        const OPEN_FETCH_TAG: &str = "<fetched_urls>";
 180        const OPEN_RULES_TAG: &str =
 181            "<rules>\nThe user has specified the following rules that should be applied:\n";
 182
 183        let mut file_context = OPEN_FILES_TAG.to_string();
 184        let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
 185        let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
 186        let mut thread_context = OPEN_THREADS_TAG.to_string();
 187        let mut fetch_context = OPEN_FETCH_TAG.to_string();
 188        let mut rules_context = OPEN_RULES_TAG.to_string();
 189
 190        for chunk in &self.content {
 191            let chunk = match chunk {
 192                UserMessageContent::Text(text) => {
 193                    language_model::MessageContent::Text(text.clone())
 194                }
 195                UserMessageContent::Image(value) => {
 196                    language_model::MessageContent::Image(value.clone())
 197                }
 198                UserMessageContent::Mention { uri, content } => {
 199                    match uri {
 200                        MentionUri::File { abs_path } => {
 201                            write!(
 202                                &mut symbol_context,
 203                                "\n{}",
 204                                MarkdownCodeBlock {
 205                                    tag: &codeblock_tag(abs_path, None),
 206                                    text: &content.to_string(),
 207                                }
 208                            )
 209                            .ok();
 210                        }
 211                        MentionUri::Directory { .. } => {
 212                            write!(&mut directory_context, "\n{}\n", content).ok();
 213                        }
 214                        MentionUri::Symbol {
 215                            path, line_range, ..
 216                        }
 217                        | MentionUri::Selection {
 218                            path, line_range, ..
 219                        } => {
 220                            write!(
 221                                &mut rules_context,
 222                                "\n{}",
 223                                MarkdownCodeBlock {
 224                                    tag: &codeblock_tag(path, Some(line_range)),
 225                                    text: content
 226                                }
 227                            )
 228                            .ok();
 229                        }
 230                        MentionUri::Thread { .. } => {
 231                            write!(&mut thread_context, "\n{}\n", content).ok();
 232                        }
 233                        MentionUri::TextThread { .. } => {
 234                            write!(&mut thread_context, "\n{}\n", content).ok();
 235                        }
 236                        MentionUri::Rule { .. } => {
 237                            write!(
 238                                &mut rules_context,
 239                                "\n{}",
 240                                MarkdownCodeBlock {
 241                                    tag: "",
 242                                    text: content
 243                                }
 244                            )
 245                            .ok();
 246                        }
 247                        MentionUri::Fetch { url } => {
 248                            write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
 249                        }
 250                    }
 251
 252                    language_model::MessageContent::Text(uri.as_link().to_string())
 253                }
 254            };
 255
 256            message.content.push(chunk);
 257        }
 258
 259        let len_before_context = message.content.len();
 260
 261        if file_context.len() > OPEN_FILES_TAG.len() {
 262            file_context.push_str("</files>\n");
 263            message
 264                .content
 265                .push(language_model::MessageContent::Text(file_context));
 266        }
 267
 268        if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
 269            directory_context.push_str("</directories>\n");
 270            message
 271                .content
 272                .push(language_model::MessageContent::Text(directory_context));
 273        }
 274
 275        if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
 276            symbol_context.push_str("</symbols>\n");
 277            message
 278                .content
 279                .push(language_model::MessageContent::Text(symbol_context));
 280        }
 281
 282        if thread_context.len() > OPEN_THREADS_TAG.len() {
 283            thread_context.push_str("</threads>\n");
 284            message
 285                .content
 286                .push(language_model::MessageContent::Text(thread_context));
 287        }
 288
 289        if fetch_context.len() > OPEN_FETCH_TAG.len() {
 290            fetch_context.push_str("</fetched_urls>\n");
 291            message
 292                .content
 293                .push(language_model::MessageContent::Text(fetch_context));
 294        }
 295
 296        if rules_context.len() > OPEN_RULES_TAG.len() {
 297            rules_context.push_str("</user_rules>\n");
 298            message
 299                .content
 300                .push(language_model::MessageContent::Text(rules_context));
 301        }
 302
 303        if message.content.len() > len_before_context {
 304            message.content.insert(
 305                len_before_context,
 306                language_model::MessageContent::Text(OPEN_CONTEXT.into()),
 307            );
 308            message
 309                .content
 310                .push(language_model::MessageContent::Text("</context>".into()));
 311        }
 312
 313        message
 314    }
 315}
 316
 317fn codeblock_tag(full_path: &Path, line_range: Option<&Range<u32>>) -> String {
 318    let mut result = String::new();
 319
 320    if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
 321        let _ = write!(result, "{} ", extension);
 322    }
 323
 324    let _ = write!(result, "{}", full_path.display());
 325
 326    if let Some(range) = line_range {
 327        if range.start == range.end {
 328            let _ = write!(result, ":{}", range.start + 1);
 329        } else {
 330            let _ = write!(result, ":{}-{}", range.start + 1, range.end + 1);
 331        }
 332    }
 333
 334    result
 335}
 336
 337impl AgentMessage {
 338    pub fn to_markdown(&self) -> String {
 339        let mut markdown = String::from("## Assistant\n\n");
 340
 341        for content in &self.content {
 342            match content {
 343                AgentMessageContent::Text(text) => {
 344                    markdown.push_str(text);
 345                    markdown.push('\n');
 346                }
 347                AgentMessageContent::Thinking { text, .. } => {
 348                    markdown.push_str("<think>");
 349                    markdown.push_str(text);
 350                    markdown.push_str("</think>\n");
 351                }
 352                AgentMessageContent::RedactedThinking(_) => {
 353                    markdown.push_str("<redacted_thinking />\n")
 354                }
 355                AgentMessageContent::ToolUse(tool_use) => {
 356                    markdown.push_str(&format!(
 357                        "**Tool Use**: {} (ID: {})\n",
 358                        tool_use.name, tool_use.id
 359                    ));
 360                    markdown.push_str(&format!(
 361                        "{}\n",
 362                        MarkdownCodeBlock {
 363                            tag: "json",
 364                            text: &format!("{:#}", tool_use.input)
 365                        }
 366                    ));
 367                }
 368            }
 369        }
 370
 371        for tool_result in self.tool_results.values() {
 372            markdown.push_str(&format!(
 373                "**Tool Result**: {} (ID: {})\n\n",
 374                tool_result.tool_name, tool_result.tool_use_id
 375            ));
 376            if tool_result.is_error {
 377                markdown.push_str("**ERROR:**\n");
 378            }
 379
 380            match &tool_result.content {
 381                LanguageModelToolResultContent::Text(text) => {
 382                    writeln!(markdown, "{text}\n").ok();
 383                }
 384                LanguageModelToolResultContent::Image(_) => {
 385                    writeln!(markdown, "<image />\n").ok();
 386                }
 387            }
 388
 389            if let Some(output) = tool_result.output.as_ref() {
 390                writeln!(
 391                    markdown,
 392                    "**Debug Output**:\n\n```json\n{}\n```\n",
 393                    serde_json::to_string_pretty(output).unwrap()
 394                )
 395                .unwrap();
 396            }
 397        }
 398
 399        markdown
 400    }
 401
 402    pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
 403        let mut assistant_message = LanguageModelRequestMessage {
 404            role: Role::Assistant,
 405            content: Vec::with_capacity(self.content.len()),
 406            cache: false,
 407        };
 408        for chunk in &self.content {
 409            let chunk = match chunk {
 410                AgentMessageContent::Text(text) => {
 411                    language_model::MessageContent::Text(text.clone())
 412                }
 413                AgentMessageContent::Thinking { text, signature } => {
 414                    language_model::MessageContent::Thinking {
 415                        text: text.clone(),
 416                        signature: signature.clone(),
 417                    }
 418                }
 419                AgentMessageContent::RedactedThinking(value) => {
 420                    language_model::MessageContent::RedactedThinking(value.clone())
 421                }
 422                AgentMessageContent::ToolUse(value) => {
 423                    language_model::MessageContent::ToolUse(value.clone())
 424                }
 425            };
 426            assistant_message.content.push(chunk);
 427        }
 428
 429        let mut user_message = LanguageModelRequestMessage {
 430            role: Role::User,
 431            content: Vec::new(),
 432            cache: false,
 433        };
 434
 435        for tool_result in self.tool_results.values() {
 436            user_message
 437                .content
 438                .push(language_model::MessageContent::ToolResult(
 439                    tool_result.clone(),
 440                ));
 441        }
 442
 443        let mut messages = Vec::new();
 444        if !assistant_message.content.is_empty() {
 445            messages.push(assistant_message);
 446        }
 447        if !user_message.content.is_empty() {
 448            messages.push(user_message);
 449        }
 450        messages
 451    }
 452}
 453
 454#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 455pub struct AgentMessage {
 456    pub content: Vec<AgentMessageContent>,
 457    pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
 458}
 459
 460#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 461pub enum AgentMessageContent {
 462    Text(String),
 463    Thinking {
 464        text: String,
 465        signature: Option<String>,
 466    },
 467    RedactedThinking(String),
 468    ToolUse(LanguageModelToolUse),
 469}
 470
 471#[derive(Debug)]
 472pub enum ThreadEvent {
 473    UserMessage(UserMessage),
 474    AgentText(String),
 475    AgentThinking(String),
 476    ToolCall(acp::ToolCall),
 477    ToolCallUpdate(acp_thread::ToolCallUpdate),
 478    ToolCallAuthorization(ToolCallAuthorization),
 479    TitleUpdate(SharedString),
 480    Retry(acp_thread::RetryStatus),
 481    Stop(acp::StopReason),
 482}
 483
 484#[derive(Debug)]
 485pub struct ToolCallAuthorization {
 486    pub tool_call: acp::ToolCallUpdate,
 487    pub options: Vec<acp::PermissionOption>,
 488    pub response: oneshot::Sender<acp::PermissionOptionId>,
 489}
 490
 491pub struct Thread {
 492    id: acp::SessionId,
 493    prompt_id: PromptId,
 494    updated_at: DateTime<Utc>,
 495    title: Option<SharedString>,
 496    #[allow(unused)]
 497    summary: DetailedSummaryState,
 498    messages: Vec<Message>,
 499    completion_mode: CompletionMode,
 500    /// Holds the task that handles agent interaction until the end of the turn.
 501    /// Survives across multiple requests as the model performs tool calls and
 502    /// we run tools, report their results.
 503    running_turn: Option<RunningTurn>,
 504    pending_message: Option<AgentMessage>,
 505    tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
 506    tool_use_limit_reached: bool,
 507    #[allow(unused)]
 508    request_token_usage: Vec<TokenUsage>,
 509    #[allow(unused)]
 510    cumulative_token_usage: TokenUsage,
 511    #[allow(unused)]
 512    initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
 513    context_server_registry: Entity<ContextServerRegistry>,
 514    profile_id: AgentProfileId,
 515    project_context: Entity<ProjectContext>,
 516    templates: Arc<Templates>,
 517    model: Option<Arc<dyn LanguageModel>>,
 518    summarization_model: Option<Arc<dyn LanguageModel>>,
 519    project: Entity<Project>,
 520    action_log: Entity<ActionLog>,
 521}
 522
 523impl Thread {
 524    pub fn new(
 525        project: Entity<Project>,
 526        project_context: Entity<ProjectContext>,
 527        context_server_registry: Entity<ContextServerRegistry>,
 528        action_log: Entity<ActionLog>,
 529        templates: Arc<Templates>,
 530        model: Option<Arc<dyn LanguageModel>>,
 531        summarization_model: Option<Arc<dyn LanguageModel>>,
 532        cx: &mut Context<Self>,
 533    ) -> Self {
 534        let profile_id = AgentSettings::get_global(cx).default_profile.clone();
 535        Self {
 536            id: acp::SessionId(uuid::Uuid::new_v4().to_string().into()),
 537            prompt_id: PromptId::new(),
 538            updated_at: Utc::now(),
 539            title: None,
 540            summary: DetailedSummaryState::default(),
 541            messages: Vec::new(),
 542            completion_mode: AgentSettings::get_global(cx).preferred_completion_mode,
 543            running_turn: None,
 544            pending_message: None,
 545            tools: BTreeMap::default(),
 546            tool_use_limit_reached: false,
 547            request_token_usage: Vec::new(),
 548            cumulative_token_usage: TokenUsage::default(),
 549            initial_project_snapshot: {
 550                let project_snapshot = Self::project_snapshot(project.clone(), cx);
 551                cx.foreground_executor()
 552                    .spawn(async move { Some(project_snapshot.await) })
 553                    .shared()
 554            },
 555            context_server_registry,
 556            profile_id,
 557            project_context,
 558            templates,
 559            model,
 560            summarization_model,
 561            project,
 562            action_log,
 563        }
 564    }
 565
 566    pub fn id(&self) -> &acp::SessionId {
 567        &self.id
 568    }
 569
 570    pub fn replay(
 571        &mut self,
 572        cx: &mut Context<Self>,
 573    ) -> mpsc::UnboundedReceiver<Result<ThreadEvent>> {
 574        let (tx, rx) = mpsc::unbounded();
 575        let stream = ThreadEventStream(tx);
 576        for message in &self.messages {
 577            match message {
 578                Message::User(user_message) => stream.send_user_message(user_message),
 579                Message::Agent(assistant_message) => {
 580                    for content in &assistant_message.content {
 581                        match content {
 582                            AgentMessageContent::Text(text) => stream.send_text(text),
 583                            AgentMessageContent::Thinking { text, .. } => {
 584                                stream.send_thinking(text)
 585                            }
 586                            AgentMessageContent::RedactedThinking(_) => {}
 587                            AgentMessageContent::ToolUse(tool_use) => {
 588                                self.replay_tool_call(
 589                                    tool_use,
 590                                    assistant_message.tool_results.get(&tool_use.id),
 591                                    &stream,
 592                                    cx,
 593                                );
 594                            }
 595                        }
 596                    }
 597                }
 598                Message::Resume => {}
 599            }
 600        }
 601        rx
 602    }
 603
 604    fn replay_tool_call(
 605        &self,
 606        tool_use: &LanguageModelToolUse,
 607        tool_result: Option<&LanguageModelToolResult>,
 608        stream: &ThreadEventStream,
 609        cx: &mut Context<Self>,
 610    ) {
 611        let Some(tool) = self.tools.get(tool_use.name.as_ref()) else {
 612            stream
 613                .0
 614                .unbounded_send(Ok(ThreadEvent::ToolCall(acp::ToolCall {
 615                    id: acp::ToolCallId(tool_use.id.to_string().into()),
 616                    title: tool_use.name.to_string(),
 617                    kind: acp::ToolKind::Other,
 618                    status: acp::ToolCallStatus::Failed,
 619                    content: Vec::new(),
 620                    locations: Vec::new(),
 621                    raw_input: Some(tool_use.input.clone()),
 622                    raw_output: None,
 623                })))
 624                .ok();
 625            return;
 626        };
 627
 628        let title = tool.initial_title(tool_use.input.clone());
 629        let kind = tool.kind();
 630        stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
 631
 632        let output = tool_result
 633            .as_ref()
 634            .and_then(|result| result.output.clone());
 635        if let Some(output) = output.clone() {
 636            let tool_event_stream = ToolCallEventStream::new(
 637                tool_use.id.clone(),
 638                stream.clone(),
 639                Some(self.project.read(cx).fs().clone()),
 640            );
 641            tool.replay(tool_use.input.clone(), output, tool_event_stream, cx)
 642                .log_err();
 643        }
 644
 645        stream.update_tool_call_fields(
 646            &tool_use.id,
 647            acp::ToolCallUpdateFields {
 648                status: Some(acp::ToolCallStatus::Completed),
 649                raw_output: output,
 650                ..Default::default()
 651            },
 652        );
 653    }
 654
 655    /// Create a snapshot of the current project state including git information and unsaved buffers.
 656    fn project_snapshot(
 657        project: Entity<Project>,
 658        cx: &mut Context<Self>,
 659    ) -> Task<Arc<agent::thread::ProjectSnapshot>> {
 660        let git_store = project.read(cx).git_store().clone();
 661        let worktree_snapshots: Vec<_> = project
 662            .read(cx)
 663            .visible_worktrees(cx)
 664            .map(|worktree| Self::worktree_snapshot(worktree, git_store.clone(), cx))
 665            .collect();
 666
 667        cx.spawn(async move |_, cx| {
 668            let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
 669
 670            let mut unsaved_buffers = Vec::new();
 671            cx.update(|app_cx| {
 672                let buffer_store = project.read(app_cx).buffer_store();
 673                for buffer_handle in buffer_store.read(app_cx).buffers() {
 674                    let buffer = buffer_handle.read(app_cx);
 675                    if buffer.is_dirty()
 676                        && let Some(file) = buffer.file()
 677                    {
 678                        let path = file.path().to_string_lossy().to_string();
 679                        unsaved_buffers.push(path);
 680                    }
 681                }
 682            })
 683            .ok();
 684
 685            Arc::new(ProjectSnapshot {
 686                worktree_snapshots,
 687                unsaved_buffer_paths: unsaved_buffers,
 688                timestamp: Utc::now(),
 689            })
 690        })
 691    }
 692
 693    fn worktree_snapshot(
 694        worktree: Entity<project::Worktree>,
 695        git_store: Entity<GitStore>,
 696        cx: &App,
 697    ) -> Task<agent::thread::WorktreeSnapshot> {
 698        cx.spawn(async move |cx| {
 699            // Get worktree path and snapshot
 700            let worktree_info = cx.update(|app_cx| {
 701                let worktree = worktree.read(app_cx);
 702                let path = worktree.abs_path().to_string_lossy().to_string();
 703                let snapshot = worktree.snapshot();
 704                (path, snapshot)
 705            });
 706
 707            let Ok((worktree_path, _snapshot)) = worktree_info else {
 708                return WorktreeSnapshot {
 709                    worktree_path: String::new(),
 710                    git_state: None,
 711                };
 712            };
 713
 714            let git_state = git_store
 715                .update(cx, |git_store, cx| {
 716                    git_store
 717                        .repositories()
 718                        .values()
 719                        .find(|repo| {
 720                            repo.read(cx)
 721                                .abs_path_to_repo_path(&worktree.read(cx).abs_path())
 722                                .is_some()
 723                        })
 724                        .cloned()
 725                })
 726                .ok()
 727                .flatten()
 728                .map(|repo| {
 729                    repo.update(cx, |repo, _| {
 730                        let current_branch =
 731                            repo.branch.as_ref().map(|branch| branch.name().to_owned());
 732                        repo.send_job(None, |state, _| async move {
 733                            let RepositoryState::Local { backend, .. } = state else {
 734                                return GitState {
 735                                    remote_url: None,
 736                                    head_sha: None,
 737                                    current_branch,
 738                                    diff: None,
 739                                };
 740                            };
 741
 742                            let remote_url = backend.remote_url("origin");
 743                            let head_sha = backend.head_sha().await;
 744                            let diff = backend.diff(DiffType::HeadToWorktree).await.ok();
 745
 746                            GitState {
 747                                remote_url,
 748                                head_sha,
 749                                current_branch,
 750                                diff,
 751                            }
 752                        })
 753                    })
 754                });
 755
 756            let git_state = match git_state {
 757                Some(git_state) => match git_state.ok() {
 758                    Some(git_state) => git_state.await.ok(),
 759                    None => None,
 760                },
 761                None => None,
 762            };
 763
 764            WorktreeSnapshot {
 765                worktree_path,
 766                git_state,
 767            }
 768        })
 769    }
 770
 771    pub fn project_context(&self) -> &Entity<ProjectContext> {
 772        &self.project_context
 773    }
 774
 775    pub fn project(&self) -> &Entity<Project> {
 776        &self.project
 777    }
 778
 779    pub fn action_log(&self) -> &Entity<ActionLog> {
 780        &self.action_log
 781    }
 782
 783    pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
 784        self.model.as_ref()
 785    }
 786
 787    pub fn set_model(&mut self, model: Arc<dyn LanguageModel>, cx: &mut Context<Self>) {
 788        self.model = Some(model);
 789        cx.notify()
 790    }
 791
 792    pub fn set_summarization_model(
 793        &mut self,
 794        model: Option<Arc<dyn LanguageModel>>,
 795        cx: &mut Context<Self>,
 796    ) {
 797        self.summarization_model = model;
 798        cx.notify()
 799    }
 800
 801    pub fn completion_mode(&self) -> CompletionMode {
 802        self.completion_mode
 803    }
 804
 805    pub fn set_completion_mode(&mut self, mode: CompletionMode, cx: &mut Context<Self>) {
 806        self.completion_mode = mode;
 807        cx.notify()
 808    }
 809
 810    #[cfg(any(test, feature = "test-support"))]
 811    pub fn last_message(&self) -> Option<Message> {
 812        if let Some(message) = self.pending_message.clone() {
 813            Some(Message::Agent(message))
 814        } else {
 815            self.messages.last().cloned()
 816        }
 817    }
 818
 819    pub fn add_tool(&mut self, tool: impl AgentTool) {
 820        self.tools.insert(tool.name(), tool.erase());
 821    }
 822
 823    pub fn remove_tool(&mut self, name: &str) -> bool {
 824        self.tools.remove(name).is_some()
 825    }
 826
 827    pub fn profile(&self) -> &AgentProfileId {
 828        &self.profile_id
 829    }
 830
 831    pub fn set_profile(&mut self, profile_id: AgentProfileId) {
 832        self.profile_id = profile_id;
 833    }
 834
 835    pub fn cancel(&mut self, cx: &mut Context<Self>) {
 836        if let Some(running_turn) = self.running_turn.take() {
 837            running_turn.cancel();
 838        }
 839        self.flush_pending_message(cx);
 840    }
 841
 842    pub fn truncate(&mut self, message_id: UserMessageId, cx: &mut Context<Self>) -> Result<()> {
 843        self.cancel(cx);
 844        let Some(position) = self.messages.iter().position(
 845            |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
 846        ) else {
 847            return Err(anyhow!("Message not found"));
 848        };
 849        self.messages.truncate(position);
 850        cx.notify();
 851        Ok(())
 852    }
 853
 854    pub fn resume(
 855        &mut self,
 856        cx: &mut Context<Self>,
 857    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
 858        anyhow::ensure!(
 859            self.tool_use_limit_reached,
 860            "can only resume after tool use limit is reached"
 861        );
 862
 863        self.messages.push(Message::Resume);
 864        cx.notify();
 865
 866        log::info!("Total messages in thread: {}", self.messages.len());
 867        self.run_turn(cx)
 868    }
 869
 870    /// Sending a message results in the model streaming a response, which could include tool calls.
 871    /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
 872    /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
 873    pub fn send<T>(
 874        &mut self,
 875        id: UserMessageId,
 876        content: impl IntoIterator<Item = T>,
 877        cx: &mut Context<Self>,
 878    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>>
 879    where
 880        T: Into<UserMessageContent>,
 881    {
 882        let model = self.model().context("No language model configured")?;
 883
 884        log::info!("Thread::send called with model: {:?}", model.name());
 885        self.advance_prompt_id();
 886
 887        let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
 888        log::debug!("Thread::send content: {:?}", content);
 889
 890        self.messages
 891            .push(Message::User(UserMessage { id, content }));
 892        cx.notify();
 893
 894        log::info!("Total messages in thread: {}", self.messages.len());
 895        self.run_turn(cx)
 896    }
 897
 898    fn run_turn(
 899        &mut self,
 900        cx: &mut Context<Self>,
 901    ) -> Result<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
 902        self.cancel(cx);
 903
 904        let model = self.model.clone().context("No language model configured")?;
 905        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
 906        let event_stream = ThreadEventStream(events_tx);
 907        let message_ix = self.messages.len().saturating_sub(1);
 908        self.tool_use_limit_reached = false;
 909        self.running_turn = Some(RunningTurn {
 910            event_stream: event_stream.clone(),
 911            _task: cx.spawn(async move |this, cx| {
 912                log::info!("Starting agent turn execution");
 913                let turn_result: Result<StopReason> = async {
 914                    let mut completion_intent = CompletionIntent::UserPrompt;
 915                    loop {
 916                        log::debug!(
 917                            "Building completion request with intent: {:?}",
 918                            completion_intent
 919                        );
 920                        let request = this.update(cx, |this, cx| {
 921                            this.build_completion_request(completion_intent, cx)
 922                        })??;
 923
 924                        log::info!("Calling model.stream_completion");
 925
 926                        let mut tool_use_limit_reached = false;
 927                        let mut refused = false;
 928                        let mut reached_max_tokens = false;
 929                        let mut tool_uses = Self::stream_completion_with_retries(
 930                            this.clone(),
 931                            model.clone(),
 932                            request,
 933                            &event_stream,
 934                            &mut tool_use_limit_reached,
 935                            &mut refused,
 936                            &mut reached_max_tokens,
 937                            cx,
 938                        )
 939                        .await?;
 940
 941                        if refused {
 942                            return Ok(StopReason::Refusal);
 943                        } else if reached_max_tokens {
 944                            return Ok(StopReason::MaxTokens);
 945                        }
 946
 947                        let end_turn = tool_uses.is_empty();
 948                        while let Some(tool_result) = tool_uses.next().await {
 949                            log::info!("Tool finished {:?}", tool_result);
 950
 951                            event_stream.update_tool_call_fields(
 952                                &tool_result.tool_use_id,
 953                                acp::ToolCallUpdateFields {
 954                                    status: Some(if tool_result.is_error {
 955                                        acp::ToolCallStatus::Failed
 956                                    } else {
 957                                        acp::ToolCallStatus::Completed
 958                                    }),
 959                                    raw_output: tool_result.output.clone(),
 960                                    ..Default::default()
 961                                },
 962                            );
 963                            this.update(cx, |this, _cx| {
 964                                this.pending_message()
 965                                    .tool_results
 966                                    .insert(tool_result.tool_use_id.clone(), tool_result);
 967                            })
 968                            .ok();
 969                        }
 970
 971                        if tool_use_limit_reached {
 972                            log::info!("Tool use limit reached, completing turn");
 973                            this.update(cx, |this, _cx| this.tool_use_limit_reached = true)?;
 974                            return Err(language_model::ToolUseLimitReachedError.into());
 975                        } else if end_turn {
 976                            log::info!("No tool uses found, completing turn");
 977                            return Ok(StopReason::EndTurn);
 978                        } else {
 979                            this.update(cx, |this, cx| this.flush_pending_message(cx))?;
 980                            completion_intent = CompletionIntent::ToolResults;
 981                        }
 982                    }
 983                }
 984                .await;
 985                _ = this.update(cx, |this, cx| this.flush_pending_message(cx));
 986
 987                match turn_result {
 988                    Ok(reason) => {
 989                        log::info!("Turn execution completed: {:?}", reason);
 990
 991                        let update_title = this
 992                            .update(cx, |this, cx| this.update_title(&event_stream, cx))
 993                            .ok()
 994                            .flatten();
 995                        if let Some(update_title) = update_title {
 996                            update_title.await.context("update title failed").log_err();
 997                        }
 998
 999                        event_stream.send_stop(reason);
1000                        if reason == StopReason::Refusal {
1001                            _ = this.update(cx, |this, _| this.messages.truncate(message_ix));
1002                        }
1003                    }
1004                    Err(error) => {
1005                        log::error!("Turn execution failed: {:?}", error);
1006                        event_stream.send_error(error);
1007                    }
1008                }
1009
1010                _ = this.update(cx, |this, _| this.running_turn.take());
1011            }),
1012        });
1013        Ok(events_rx)
1014    }
1015
1016    async fn stream_completion_with_retries(
1017        this: WeakEntity<Self>,
1018        model: Arc<dyn LanguageModel>,
1019        request: LanguageModelRequest,
1020        event_stream: &ThreadEventStream,
1021        tool_use_limit_reached: &mut bool,
1022        refusal: &mut bool,
1023        max_tokens_reached: &mut bool,
1024        cx: &mut AsyncApp,
1025    ) -> Result<FuturesUnordered<Task<LanguageModelToolResult>>> {
1026        log::debug!("Stream completion started successfully");
1027
1028        let mut attempt = None;
1029        'retry: loop {
1030            let mut events = model.stream_completion(request.clone(), cx).await?;
1031            let mut tool_uses = FuturesUnordered::new();
1032            while let Some(event) = events.next().await {
1033                match event {
1034                    Ok(LanguageModelCompletionEvent::StatusUpdate(
1035                        CompletionRequestStatus::ToolUseLimitReached,
1036                    )) => {
1037                        *tool_use_limit_reached = true;
1038                    }
1039                    Ok(LanguageModelCompletionEvent::Stop(StopReason::Refusal)) => {
1040                        *refusal = true;
1041                        return Ok(FuturesUnordered::default());
1042                    }
1043                    Ok(LanguageModelCompletionEvent::Stop(StopReason::MaxTokens)) => {
1044                        *max_tokens_reached = true;
1045                        return Ok(FuturesUnordered::default());
1046                    }
1047                    Ok(LanguageModelCompletionEvent::Stop(
1048                        StopReason::ToolUse | StopReason::EndTurn,
1049                    )) => break,
1050                    Ok(event) => {
1051                        log::trace!("Received completion event: {:?}", event);
1052                        this.update(cx, |this, cx| {
1053                            tool_uses.extend(this.handle_streamed_completion_event(
1054                                event,
1055                                event_stream,
1056                                cx,
1057                            ));
1058                        })
1059                        .ok();
1060                    }
1061                    Err(error) => {
1062                        let completion_mode =
1063                            this.read_with(cx, |thread, _cx| thread.completion_mode())?;
1064                        if completion_mode == CompletionMode::Normal {
1065                            return Err(error.into());
1066                        }
1067
1068                        let Some(strategy) = Self::retry_strategy_for(&error) else {
1069                            return Err(error.into());
1070                        };
1071
1072                        let max_attempts = match &strategy {
1073                            RetryStrategy::ExponentialBackoff { max_attempts, .. } => *max_attempts,
1074                            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
1075                        };
1076
1077                        let attempt = attempt.get_or_insert(0u8);
1078
1079                        *attempt += 1;
1080
1081                        let attempt = *attempt;
1082                        if attempt > max_attempts {
1083                            return Err(error.into());
1084                        }
1085
1086                        let delay = match &strategy {
1087                            RetryStrategy::ExponentialBackoff { initial_delay, .. } => {
1088                                let delay_secs =
1089                                    initial_delay.as_secs() * 2u64.pow((attempt - 1) as u32);
1090                                Duration::from_secs(delay_secs)
1091                            }
1092                            RetryStrategy::Fixed { delay, .. } => *delay,
1093                        };
1094                        log::debug!("Retry attempt {attempt} with delay {delay:?}");
1095
1096                        event_stream.send_retry(acp_thread::RetryStatus {
1097                            last_error: error.to_string().into(),
1098                            attempt: attempt as usize,
1099                            max_attempts: max_attempts as usize,
1100                            started_at: Instant::now(),
1101                            duration: delay,
1102                        });
1103
1104                        cx.background_executor().timer(delay).await;
1105                        continue 'retry;
1106                    }
1107                }
1108            }
1109
1110            return Ok(tool_uses);
1111        }
1112    }
1113
1114    pub fn build_system_message(&self, cx: &App) -> LanguageModelRequestMessage {
1115        log::debug!("Building system message");
1116        let prompt = SystemPromptTemplate {
1117            project: self.project_context.read(cx),
1118            available_tools: self.tools.keys().cloned().collect(),
1119        }
1120        .render(&self.templates)
1121        .context("failed to build system prompt")
1122        .expect("Invalid template");
1123        log::debug!("System message built");
1124        LanguageModelRequestMessage {
1125            role: Role::System,
1126            content: vec![prompt.into()],
1127            cache: true,
1128        }
1129    }
1130
1131    /// A helper method that's called on every streamed completion event.
1132    /// Returns an optional tool result task, which the main agentic loop in
1133    /// send will send back to the model when it resolves.
1134    fn handle_streamed_completion_event(
1135        &mut self,
1136        event: LanguageModelCompletionEvent,
1137        event_stream: &ThreadEventStream,
1138        cx: &mut Context<Self>,
1139    ) -> Option<Task<LanguageModelToolResult>> {
1140        log::trace!("Handling streamed completion event: {:?}", event);
1141        use LanguageModelCompletionEvent::*;
1142
1143        match event {
1144            StartMessage { .. } => {
1145                self.flush_pending_message(cx);
1146                self.pending_message = Some(AgentMessage::default());
1147            }
1148            Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
1149            Thinking { text, signature } => {
1150                self.handle_thinking_event(text, signature, event_stream, cx)
1151            }
1152            RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
1153            ToolUse(tool_use) => {
1154                return self.handle_tool_use_event(tool_use, event_stream, cx);
1155            }
1156            ToolUseJsonParseError {
1157                id,
1158                tool_name,
1159                raw_input,
1160                json_parse_error,
1161            } => {
1162                return Some(Task::ready(self.handle_tool_use_json_parse_error_event(
1163                    id,
1164                    tool_name,
1165                    raw_input,
1166                    json_parse_error,
1167                )));
1168            }
1169            UsageUpdate(_) | StatusUpdate(_) => {}
1170            Stop(_) => unreachable!(),
1171        }
1172
1173        None
1174    }
1175
1176    fn handle_text_event(
1177        &mut self,
1178        new_text: String,
1179        event_stream: &ThreadEventStream,
1180        cx: &mut Context<Self>,
1181    ) {
1182        event_stream.send_text(&new_text);
1183
1184        let last_message = self.pending_message();
1185        if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
1186            text.push_str(&new_text);
1187        } else {
1188            last_message
1189                .content
1190                .push(AgentMessageContent::Text(new_text));
1191        }
1192
1193        cx.notify();
1194    }
1195
1196    fn handle_thinking_event(
1197        &mut self,
1198        new_text: String,
1199        new_signature: Option<String>,
1200        event_stream: &ThreadEventStream,
1201        cx: &mut Context<Self>,
1202    ) {
1203        event_stream.send_thinking(&new_text);
1204
1205        let last_message = self.pending_message();
1206        if let Some(AgentMessageContent::Thinking { text, signature }) =
1207            last_message.content.last_mut()
1208        {
1209            text.push_str(&new_text);
1210            *signature = new_signature.or(signature.take());
1211        } else {
1212            last_message.content.push(AgentMessageContent::Thinking {
1213                text: new_text,
1214                signature: new_signature,
1215            });
1216        }
1217
1218        cx.notify();
1219    }
1220
1221    fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
1222        let last_message = self.pending_message();
1223        last_message
1224            .content
1225            .push(AgentMessageContent::RedactedThinking(data));
1226        cx.notify();
1227    }
1228
1229    fn handle_tool_use_event(
1230        &mut self,
1231        tool_use: LanguageModelToolUse,
1232        event_stream: &ThreadEventStream,
1233        cx: &mut Context<Self>,
1234    ) -> Option<Task<LanguageModelToolResult>> {
1235        cx.notify();
1236
1237        let tool = self.tools.get(tool_use.name.as_ref()).cloned();
1238        let mut title = SharedString::from(&tool_use.name);
1239        let mut kind = acp::ToolKind::Other;
1240        if let Some(tool) = tool.as_ref() {
1241            title = tool.initial_title(tool_use.input.clone());
1242            kind = tool.kind();
1243        }
1244
1245        // Ensure the last message ends in the current tool use
1246        let last_message = self.pending_message();
1247        let push_new_tool_use = last_message.content.last_mut().map_or(true, |content| {
1248            if let AgentMessageContent::ToolUse(last_tool_use) = content {
1249                if last_tool_use.id == tool_use.id {
1250                    *last_tool_use = tool_use.clone();
1251                    false
1252                } else {
1253                    true
1254                }
1255            } else {
1256                true
1257            }
1258        });
1259
1260        if push_new_tool_use {
1261            event_stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
1262            last_message
1263                .content
1264                .push(AgentMessageContent::ToolUse(tool_use.clone()));
1265        } else {
1266            event_stream.update_tool_call_fields(
1267                &tool_use.id,
1268                acp::ToolCallUpdateFields {
1269                    title: Some(title.into()),
1270                    kind: Some(kind),
1271                    raw_input: Some(tool_use.input.clone()),
1272                    ..Default::default()
1273                },
1274            );
1275        }
1276
1277        if !tool_use.is_input_complete {
1278            return None;
1279        }
1280
1281        let Some(tool) = tool else {
1282            let content = format!("No tool named {} exists", tool_use.name);
1283            return Some(Task::ready(LanguageModelToolResult {
1284                content: LanguageModelToolResultContent::Text(Arc::from(content)),
1285                tool_use_id: tool_use.id,
1286                tool_name: tool_use.name,
1287                is_error: true,
1288                output: None,
1289            }));
1290        };
1291
1292        let fs = self.project.read(cx).fs().clone();
1293        let tool_event_stream =
1294            ToolCallEventStream::new(tool_use.id.clone(), event_stream.clone(), Some(fs));
1295        tool_event_stream.update_fields(acp::ToolCallUpdateFields {
1296            status: Some(acp::ToolCallStatus::InProgress),
1297            ..Default::default()
1298        });
1299        let supports_images = self.model().map_or(false, |model| model.supports_images());
1300        let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
1301        log::info!("Running tool {}", tool_use.name);
1302        Some(cx.foreground_executor().spawn(async move {
1303            let tool_result = tool_result.await.and_then(|output| {
1304                if let LanguageModelToolResultContent::Image(_) = &output.llm_output
1305                    && !supports_images
1306                {
1307                    return Err(anyhow!(
1308                        "Attempted to read an image, but this model doesn't support it.",
1309                    ));
1310                }
1311                Ok(output)
1312            });
1313
1314            match tool_result {
1315                Ok(output) => LanguageModelToolResult {
1316                    tool_use_id: tool_use.id,
1317                    tool_name: tool_use.name,
1318                    is_error: false,
1319                    content: output.llm_output,
1320                    output: Some(output.raw_output),
1321                },
1322                Err(error) => LanguageModelToolResult {
1323                    tool_use_id: tool_use.id,
1324                    tool_name: tool_use.name,
1325                    is_error: true,
1326                    content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
1327                    output: None,
1328                },
1329            }
1330        }))
1331    }
1332
1333    fn handle_tool_use_json_parse_error_event(
1334        &mut self,
1335        tool_use_id: LanguageModelToolUseId,
1336        tool_name: Arc<str>,
1337        raw_input: Arc<str>,
1338        json_parse_error: String,
1339    ) -> LanguageModelToolResult {
1340        let tool_output = format!("Error parsing input JSON: {json_parse_error}");
1341        LanguageModelToolResult {
1342            tool_use_id,
1343            tool_name,
1344            is_error: true,
1345            content: LanguageModelToolResultContent::Text(tool_output.into()),
1346            output: Some(serde_json::Value::String(raw_input.to_string())),
1347        }
1348    }
1349
1350    pub fn title(&self) -> SharedString {
1351        self.title.clone().unwrap_or("New Thread".into())
1352    }
1353
1354    fn update_title(
1355        &mut self,
1356        event_stream: &ThreadEventStream,
1357        cx: &mut Context<Self>,
1358    ) -> Option<Task<Result<()>>> {
1359        if self.title.is_some() {
1360            log::debug!("Skipping title generation because we already have one.");
1361            return None;
1362        }
1363
1364        log::info!(
1365            "Generating title with model: {:?}",
1366            self.summarization_model.as_ref().map(|model| model.name())
1367        );
1368        let model = self.summarization_model.clone()?;
1369        let event_stream = event_stream.clone();
1370        let mut request = LanguageModelRequest {
1371            intent: Some(CompletionIntent::ThreadSummarization),
1372            temperature: AgentSettings::temperature_for_model(&model, cx),
1373            ..Default::default()
1374        };
1375
1376        for message in &self.messages {
1377            request.messages.extend(message.to_request());
1378        }
1379
1380        request.messages.push(LanguageModelRequestMessage {
1381            role: Role::User,
1382            content: vec![SUMMARIZE_THREAD_PROMPT.into()],
1383            cache: false,
1384        });
1385        Some(cx.spawn(async move |this, cx| {
1386            let mut title = String::new();
1387            let mut messages = model.stream_completion(request, cx).await?;
1388            while let Some(event) = messages.next().await {
1389                let event = event?;
1390                let text = match event {
1391                    LanguageModelCompletionEvent::Text(text) => text,
1392                    LanguageModelCompletionEvent::StatusUpdate(
1393                        CompletionRequestStatus::UsageUpdated { .. },
1394                    ) => {
1395                        // this.update(cx, |thread, cx| {
1396                        //     thread.update_model_request_usage(amount as u32, limit, cx);
1397                        // })?;
1398                        // TODO: handle usage update
1399                        continue;
1400                    }
1401                    _ => continue,
1402                };
1403
1404                let mut lines = text.lines();
1405                title.extend(lines.next());
1406
1407                // Stop if the LLM generated multiple lines.
1408                if lines.next().is_some() {
1409                    break;
1410                }
1411            }
1412
1413            log::info!("Setting title: {}", title);
1414
1415            this.update(cx, |this, cx| {
1416                let title = SharedString::from(title);
1417                event_stream.send_title_update(title.clone());
1418                this.title = Some(title);
1419                cx.notify();
1420            })
1421        }))
1422    }
1423
1424    fn pending_message(&mut self) -> &mut AgentMessage {
1425        self.pending_message.get_or_insert_default()
1426    }
1427
1428    fn flush_pending_message(&mut self, cx: &mut Context<Self>) {
1429        let Some(mut message) = self.pending_message.take() else {
1430            return;
1431        };
1432
1433        for content in &message.content {
1434            let AgentMessageContent::ToolUse(tool_use) = content else {
1435                continue;
1436            };
1437
1438            if !message.tool_results.contains_key(&tool_use.id) {
1439                message.tool_results.insert(
1440                    tool_use.id.clone(),
1441                    LanguageModelToolResult {
1442                        tool_use_id: tool_use.id.clone(),
1443                        tool_name: tool_use.name.clone(),
1444                        is_error: true,
1445                        content: LanguageModelToolResultContent::Text(TOOL_CANCELED_MESSAGE.into()),
1446                        output: None,
1447                    },
1448                );
1449            }
1450        }
1451
1452        self.messages.push(Message::Agent(message));
1453        self.updated_at = Utc::now();
1454        cx.notify()
1455    }
1456
1457    pub(crate) fn build_completion_request(
1458        &self,
1459        completion_intent: CompletionIntent,
1460        cx: &mut App,
1461    ) -> Result<LanguageModelRequest> {
1462        let model = self.model().context("No language model configured")?;
1463
1464        log::debug!("Building completion request");
1465        log::debug!("Completion intent: {:?}", completion_intent);
1466        log::debug!("Completion mode: {:?}", self.completion_mode);
1467
1468        let messages = self.build_request_messages(cx);
1469        log::info!("Request will include {} messages", messages.len());
1470
1471        let tools = if let Some(tools) = self.tools(cx).log_err() {
1472            tools
1473                .filter_map(|tool| {
1474                    let tool_name = tool.name().to_string();
1475                    log::trace!("Including tool: {}", tool_name);
1476                    Some(LanguageModelRequestTool {
1477                        name: tool_name,
1478                        description: tool.description().to_string(),
1479                        input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
1480                    })
1481                })
1482                .collect()
1483        } else {
1484            Vec::new()
1485        };
1486
1487        log::info!("Request includes {} tools", tools.len());
1488
1489        let request = LanguageModelRequest {
1490            thread_id: Some(self.id.to_string()),
1491            prompt_id: Some(self.prompt_id.to_string()),
1492            intent: Some(completion_intent),
1493            mode: Some(self.completion_mode.into()),
1494            messages,
1495            tools,
1496            tool_choice: None,
1497            stop: Vec::new(),
1498            temperature: AgentSettings::temperature_for_model(model, cx),
1499            thinking_allowed: true,
1500        };
1501
1502        log::debug!("Completion request built successfully");
1503        Ok(request)
1504    }
1505
1506    fn tools<'a>(&'a self, cx: &'a App) -> Result<impl Iterator<Item = &'a Arc<dyn AnyAgentTool>>> {
1507        let model = self.model().context("No language model configured")?;
1508
1509        let profile = AgentSettings::get_global(cx)
1510            .profiles
1511            .get(&self.profile_id)
1512            .context("profile not found")?;
1513        let provider_id = model.provider_id();
1514
1515        Ok(self
1516            .tools
1517            .iter()
1518            .filter(move |(_, tool)| tool.supported_provider(&provider_id))
1519            .filter_map(|(tool_name, tool)| {
1520                if profile.is_tool_enabled(tool_name) {
1521                    Some(tool)
1522                } else {
1523                    None
1524                }
1525            })
1526            .chain(self.context_server_registry.read(cx).servers().flat_map(
1527                |(server_id, tools)| {
1528                    tools.iter().filter_map(|(tool_name, tool)| {
1529                        if profile.is_context_server_tool_enabled(&server_id.0, tool_name) {
1530                            Some(tool)
1531                        } else {
1532                            None
1533                        }
1534                    })
1535                },
1536            )))
1537    }
1538
1539    fn build_request_messages(&self, cx: &App) -> Vec<LanguageModelRequestMessage> {
1540        log::trace!(
1541            "Building request messages from {} thread messages",
1542            self.messages.len()
1543        );
1544        let mut messages = vec![self.build_system_message(cx)];
1545        for message in &self.messages {
1546            messages.extend(message.to_request());
1547        }
1548
1549        if let Some(message) = self.pending_message.as_ref() {
1550            messages.extend(message.to_request());
1551        }
1552
1553        if let Some(last_user_message) = messages
1554            .iter_mut()
1555            .rev()
1556            .find(|message| message.role == Role::User)
1557        {
1558            last_user_message.cache = true;
1559        }
1560
1561        messages
1562    }
1563
1564    pub fn to_markdown(&self) -> String {
1565        let mut markdown = String::new();
1566        for (ix, message) in self.messages.iter().enumerate() {
1567            if ix > 0 {
1568                markdown.push('\n');
1569            }
1570            markdown.push_str(&message.to_markdown());
1571        }
1572
1573        if let Some(message) = self.pending_message.as_ref() {
1574            markdown.push('\n');
1575            markdown.push_str(&message.to_markdown());
1576        }
1577
1578        markdown
1579    }
1580
1581    fn advance_prompt_id(&mut self) {
1582        self.prompt_id = PromptId::new();
1583    }
1584
1585    fn retry_strategy_for(error: &LanguageModelCompletionError) -> Option<RetryStrategy> {
1586        use LanguageModelCompletionError::*;
1587        use http_client::StatusCode;
1588
1589        // General strategy here:
1590        // - If retrying won't help (e.g. invalid API key or payload too large), return None so we don't retry at all.
1591        // - If it's a time-based issue (e.g. server overloaded, rate limit exceeded), retry up to 4 times with exponential backoff.
1592        // - If it's an issue that *might* be fixed by retrying (e.g. internal server error), retry up to 3 times.
1593        match error {
1594            HttpResponseError {
1595                status_code: StatusCode::TOO_MANY_REQUESTS,
1596                ..
1597            } => Some(RetryStrategy::ExponentialBackoff {
1598                initial_delay: BASE_RETRY_DELAY,
1599                max_attempts: MAX_RETRY_ATTEMPTS,
1600            }),
1601            ServerOverloaded { retry_after, .. } | RateLimitExceeded { retry_after, .. } => {
1602                Some(RetryStrategy::Fixed {
1603                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1604                    max_attempts: MAX_RETRY_ATTEMPTS,
1605                })
1606            }
1607            UpstreamProviderError {
1608                status,
1609                retry_after,
1610                ..
1611            } => match *status {
1612                StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE => {
1613                    Some(RetryStrategy::Fixed {
1614                        delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1615                        max_attempts: MAX_RETRY_ATTEMPTS,
1616                    })
1617                }
1618                StatusCode::INTERNAL_SERVER_ERROR => Some(RetryStrategy::Fixed {
1619                    delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1620                    // Internal Server Error could be anything, retry up to 3 times.
1621                    max_attempts: 3,
1622                }),
1623                status => {
1624                    // There is no StatusCode variant for the unofficial HTTP 529 ("The service is overloaded"),
1625                    // but we frequently get them in practice. See https://http.dev/529
1626                    if status.as_u16() == 529 {
1627                        Some(RetryStrategy::Fixed {
1628                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1629                            max_attempts: MAX_RETRY_ATTEMPTS,
1630                        })
1631                    } else {
1632                        Some(RetryStrategy::Fixed {
1633                            delay: retry_after.unwrap_or(BASE_RETRY_DELAY),
1634                            max_attempts: 2,
1635                        })
1636                    }
1637                }
1638            },
1639            ApiInternalServerError { .. } => Some(RetryStrategy::Fixed {
1640                delay: BASE_RETRY_DELAY,
1641                max_attempts: 3,
1642            }),
1643            ApiReadResponseError { .. }
1644            | HttpSend { .. }
1645            | DeserializeResponse { .. }
1646            | BadRequestFormat { .. } => Some(RetryStrategy::Fixed {
1647                delay: BASE_RETRY_DELAY,
1648                max_attempts: 3,
1649            }),
1650            // Retrying these errors definitely shouldn't help.
1651            HttpResponseError {
1652                status_code:
1653                    StatusCode::PAYLOAD_TOO_LARGE | StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED,
1654                ..
1655            }
1656            | AuthenticationError { .. }
1657            | PermissionError { .. }
1658            | NoApiKey { .. }
1659            | ApiEndpointNotFound { .. }
1660            | PromptTooLarge { .. } => None,
1661            // These errors might be transient, so retry them
1662            SerializeRequest { .. } | BuildRequestBody { .. } => Some(RetryStrategy::Fixed {
1663                delay: BASE_RETRY_DELAY,
1664                max_attempts: 1,
1665            }),
1666            // Retry all other 4xx and 5xx errors once.
1667            HttpResponseError { status_code, .. }
1668                if status_code.is_client_error() || status_code.is_server_error() =>
1669            {
1670                Some(RetryStrategy::Fixed {
1671                    delay: BASE_RETRY_DELAY,
1672                    max_attempts: 3,
1673                })
1674            }
1675            Other(err)
1676                if err.is::<language_model::PaymentRequiredError>()
1677                    || err.is::<language_model::ModelRequestLimitReachedError>() =>
1678            {
1679                // Retrying won't help for Payment Required or Model Request Limit errors (where
1680                // the user must upgrade to usage-based billing to get more requests, or else wait
1681                // for a significant amount of time for the request limit to reset).
1682                None
1683            }
1684            // Conservatively assume that any other errors are non-retryable
1685            HttpResponseError { .. } | Other(..) => Some(RetryStrategy::Fixed {
1686                delay: BASE_RETRY_DELAY,
1687                max_attempts: 2,
1688            }),
1689        }
1690    }
1691}
1692
1693struct RunningTurn {
1694    /// Holds the task that handles agent interaction until the end of the turn.
1695    /// Survives across multiple requests as the model performs tool calls and
1696    /// we run tools, report their results.
1697    _task: Task<()>,
1698    /// The current event stream for the running turn. Used to report a final
1699    /// cancellation event if we cancel the turn.
1700    event_stream: ThreadEventStream,
1701}
1702
1703impl RunningTurn {
1704    fn cancel(self) {
1705        log::debug!("Cancelling in progress turn");
1706        self.event_stream.send_canceled();
1707    }
1708}
1709
1710pub trait AgentTool
1711where
1712    Self: 'static + Sized,
1713{
1714    type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
1715    type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
1716
1717    fn name(&self) -> SharedString;
1718
1719    fn description(&self) -> SharedString {
1720        let schema = schemars::schema_for!(Self::Input);
1721        SharedString::new(
1722            schema
1723                .get("description")
1724                .and_then(|description| description.as_str())
1725                .unwrap_or_default(),
1726        )
1727    }
1728
1729    fn kind(&self) -> acp::ToolKind;
1730
1731    /// The initial tool title to display. Can be updated during the tool run.
1732    fn initial_title(&self, input: Result<Self::Input, serde_json::Value>) -> SharedString;
1733
1734    /// Returns the JSON schema that describes the tool's input.
1735    fn input_schema(&self) -> Schema {
1736        schemars::schema_for!(Self::Input)
1737    }
1738
1739    /// Some tools rely on a provider for the underlying billing or other reasons.
1740    /// Allow the tool to check if they are compatible, or should be filtered out.
1741    fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
1742        true
1743    }
1744
1745    /// Runs the tool with the provided input.
1746    fn run(
1747        self: Arc<Self>,
1748        input: Self::Input,
1749        event_stream: ToolCallEventStream,
1750        cx: &mut App,
1751    ) -> Task<Result<Self::Output>>;
1752
1753    /// Emits events for a previous execution of the tool.
1754    fn replay(
1755        &self,
1756        _input: Self::Input,
1757        _output: Self::Output,
1758        _event_stream: ToolCallEventStream,
1759        _cx: &mut App,
1760    ) -> Result<()> {
1761        Ok(())
1762    }
1763
1764    fn erase(self) -> Arc<dyn AnyAgentTool> {
1765        Arc::new(Erased(Arc::new(self)))
1766    }
1767}
1768
1769pub struct Erased<T>(T);
1770
1771pub struct AgentToolOutput {
1772    pub llm_output: LanguageModelToolResultContent,
1773    pub raw_output: serde_json::Value,
1774}
1775
1776pub trait AnyAgentTool {
1777    fn name(&self) -> SharedString;
1778    fn description(&self) -> SharedString;
1779    fn kind(&self) -> acp::ToolKind;
1780    fn initial_title(&self, input: serde_json::Value) -> SharedString;
1781    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
1782    fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
1783        true
1784    }
1785    fn run(
1786        self: Arc<Self>,
1787        input: serde_json::Value,
1788        event_stream: ToolCallEventStream,
1789        cx: &mut App,
1790    ) -> Task<Result<AgentToolOutput>>;
1791    fn replay(
1792        &self,
1793        input: serde_json::Value,
1794        output: serde_json::Value,
1795        event_stream: ToolCallEventStream,
1796        cx: &mut App,
1797    ) -> Result<()>;
1798}
1799
1800impl<T> AnyAgentTool for Erased<Arc<T>>
1801where
1802    T: AgentTool,
1803{
1804    fn name(&self) -> SharedString {
1805        self.0.name()
1806    }
1807
1808    fn description(&self) -> SharedString {
1809        self.0.description()
1810    }
1811
1812    fn kind(&self) -> agent_client_protocol::ToolKind {
1813        self.0.kind()
1814    }
1815
1816    fn initial_title(&self, input: serde_json::Value) -> SharedString {
1817        let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
1818        self.0.initial_title(parsed_input)
1819    }
1820
1821    fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
1822        let mut json = serde_json::to_value(self.0.input_schema())?;
1823        adapt_schema_to_format(&mut json, format)?;
1824        Ok(json)
1825    }
1826
1827    fn supported_provider(&self, provider: &LanguageModelProviderId) -> bool {
1828        self.0.supported_provider(provider)
1829    }
1830
1831    fn run(
1832        self: Arc<Self>,
1833        input: serde_json::Value,
1834        event_stream: ToolCallEventStream,
1835        cx: &mut App,
1836    ) -> Task<Result<AgentToolOutput>> {
1837        cx.spawn(async move |cx| {
1838            let input = serde_json::from_value(input)?;
1839            let output = cx
1840                .update(|cx| self.0.clone().run(input, event_stream, cx))?
1841                .await?;
1842            let raw_output = serde_json::to_value(&output)?;
1843            Ok(AgentToolOutput {
1844                llm_output: output.into(),
1845                raw_output,
1846            })
1847        })
1848    }
1849
1850    fn replay(
1851        &self,
1852        input: serde_json::Value,
1853        output: serde_json::Value,
1854        event_stream: ToolCallEventStream,
1855        cx: &mut App,
1856    ) -> Result<()> {
1857        let input = serde_json::from_value(input)?;
1858        let output = serde_json::from_value(output)?;
1859        self.0.replay(input, output, event_stream, cx)
1860    }
1861}
1862
1863#[derive(Clone)]
1864struct ThreadEventStream(mpsc::UnboundedSender<Result<ThreadEvent>>);
1865
1866impl ThreadEventStream {
1867    fn send_title_update(&self, text: SharedString) {
1868        self.0
1869            .unbounded_send(Ok(ThreadEvent::TitleUpdate(text)))
1870            .ok();
1871    }
1872
1873    fn send_user_message(&self, message: &UserMessage) {
1874        self.0
1875            .unbounded_send(Ok(ThreadEvent::UserMessage(message.clone())))
1876            .ok();
1877    }
1878
1879    fn send_text(&self, text: &str) {
1880        self.0
1881            .unbounded_send(Ok(ThreadEvent::AgentText(text.to_string())))
1882            .ok();
1883    }
1884
1885    fn send_thinking(&self, text: &str) {
1886        self.0
1887            .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))
1888            .ok();
1889    }
1890
1891    fn send_tool_call(
1892        &self,
1893        id: &LanguageModelToolUseId,
1894        title: SharedString,
1895        kind: acp::ToolKind,
1896        input: serde_json::Value,
1897    ) {
1898        self.0
1899            .unbounded_send(Ok(ThreadEvent::ToolCall(Self::initial_tool_call(
1900                id,
1901                title.to_string(),
1902                kind,
1903                input,
1904            ))))
1905            .ok();
1906    }
1907
1908    fn initial_tool_call(
1909        id: &LanguageModelToolUseId,
1910        title: String,
1911        kind: acp::ToolKind,
1912        input: serde_json::Value,
1913    ) -> acp::ToolCall {
1914        acp::ToolCall {
1915            id: acp::ToolCallId(id.to_string().into()),
1916            title,
1917            kind,
1918            status: acp::ToolCallStatus::Pending,
1919            content: vec![],
1920            locations: vec![],
1921            raw_input: Some(input),
1922            raw_output: None,
1923        }
1924    }
1925
1926    fn update_tool_call_fields(
1927        &self,
1928        tool_use_id: &LanguageModelToolUseId,
1929        fields: acp::ToolCallUpdateFields,
1930    ) {
1931        self.0
1932            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
1933                acp::ToolCallUpdate {
1934                    id: acp::ToolCallId(tool_use_id.to_string().into()),
1935                    fields,
1936                }
1937                .into(),
1938            )))
1939            .ok();
1940    }
1941
1942    fn send_retry(&self, status: acp_thread::RetryStatus) {
1943        self.0.unbounded_send(Ok(ThreadEvent::Retry(status))).ok();
1944    }
1945
1946    fn send_stop(&self, reason: StopReason) {
1947        match reason {
1948            StopReason::EndTurn => {
1949                self.0
1950                    .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::EndTurn)))
1951                    .ok();
1952            }
1953            StopReason::MaxTokens => {
1954                self.0
1955                    .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::MaxTokens)))
1956                    .ok();
1957            }
1958            StopReason::Refusal => {
1959                self.0
1960                    .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Refusal)))
1961                    .ok();
1962            }
1963            StopReason::ToolUse => {}
1964        }
1965    }
1966
1967    fn send_canceled(&self) {
1968        self.0
1969            .unbounded_send(Ok(ThreadEvent::Stop(acp::StopReason::Canceled)))
1970            .ok();
1971    }
1972
1973    fn send_error(&self, error: impl Into<anyhow::Error>) {
1974        self.0.unbounded_send(Err(error.into())).ok();
1975    }
1976}
1977
1978#[derive(Clone)]
1979pub struct ToolCallEventStream {
1980    tool_use_id: LanguageModelToolUseId,
1981    stream: ThreadEventStream,
1982    fs: Option<Arc<dyn Fs>>,
1983}
1984
1985impl ToolCallEventStream {
1986    #[cfg(test)]
1987    pub fn test() -> (Self, ToolCallEventStreamReceiver) {
1988        let (events_tx, events_rx) = mpsc::unbounded::<Result<ThreadEvent>>();
1989
1990        let stream = ToolCallEventStream::new("test_id".into(), ThreadEventStream(events_tx), None);
1991
1992        (stream, ToolCallEventStreamReceiver(events_rx))
1993    }
1994
1995    fn new(
1996        tool_use_id: LanguageModelToolUseId,
1997        stream: ThreadEventStream,
1998        fs: Option<Arc<dyn Fs>>,
1999    ) -> Self {
2000        Self {
2001            tool_use_id,
2002            stream,
2003            fs,
2004        }
2005    }
2006
2007    pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
2008        self.stream
2009            .update_tool_call_fields(&self.tool_use_id, fields);
2010    }
2011
2012    pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
2013        self.stream
2014            .0
2015            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2016                acp_thread::ToolCallUpdateDiff {
2017                    id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2018                    diff,
2019                }
2020                .into(),
2021            )))
2022            .ok();
2023    }
2024
2025    pub fn update_terminal(&self, terminal: Entity<acp_thread::Terminal>) {
2026        self.stream
2027            .0
2028            .unbounded_send(Ok(ThreadEvent::ToolCallUpdate(
2029                acp_thread::ToolCallUpdateTerminal {
2030                    id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2031                    terminal,
2032                }
2033                .into(),
2034            )))
2035            .ok();
2036    }
2037
2038    pub fn authorize(&self, title: impl Into<String>, cx: &mut App) -> Task<Result<()>> {
2039        if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions {
2040            return Task::ready(Ok(()));
2041        }
2042
2043        let (response_tx, response_rx) = oneshot::channel();
2044        self.stream
2045            .0
2046            .unbounded_send(Ok(ThreadEvent::ToolCallAuthorization(
2047                ToolCallAuthorization {
2048                    tool_call: acp::ToolCallUpdate {
2049                        id: acp::ToolCallId(self.tool_use_id.to_string().into()),
2050                        fields: acp::ToolCallUpdateFields {
2051                            title: Some(title.into()),
2052                            ..Default::default()
2053                        },
2054                    },
2055                    options: vec![
2056                        acp::PermissionOption {
2057                            id: acp::PermissionOptionId("always_allow".into()),
2058                            name: "Always Allow".into(),
2059                            kind: acp::PermissionOptionKind::AllowAlways,
2060                        },
2061                        acp::PermissionOption {
2062                            id: acp::PermissionOptionId("allow".into()),
2063                            name: "Allow".into(),
2064                            kind: acp::PermissionOptionKind::AllowOnce,
2065                        },
2066                        acp::PermissionOption {
2067                            id: acp::PermissionOptionId("deny".into()),
2068                            name: "Deny".into(),
2069                            kind: acp::PermissionOptionKind::RejectOnce,
2070                        },
2071                    ],
2072                    response: response_tx,
2073                },
2074            )))
2075            .ok();
2076        let fs = self.fs.clone();
2077        cx.spawn(async move |cx| match response_rx.await?.0.as_ref() {
2078            "always_allow" => {
2079                if let Some(fs) = fs.clone() {
2080                    cx.update(|cx| {
2081                        update_settings_file::<AgentSettings>(fs, cx, |settings, _| {
2082                            settings.set_always_allow_tool_actions(true);
2083                        });
2084                    })?;
2085                }
2086
2087                Ok(())
2088            }
2089            "allow" => Ok(()),
2090            _ => Err(anyhow!("Permission to run tool denied by user")),
2091        })
2092    }
2093}
2094
2095#[cfg(test)]
2096pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<ThreadEvent>>);
2097
2098#[cfg(test)]
2099impl ToolCallEventStreamReceiver {
2100    pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
2101        let event = self.0.next().await;
2102        if let Some(Ok(ThreadEvent::ToolCallAuthorization(auth))) = event {
2103            auth
2104        } else {
2105            panic!("Expected ToolCallAuthorization but got: {:?}", event);
2106        }
2107    }
2108
2109    pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
2110        let event = self.0.next().await;
2111        if let Some(Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateTerminal(
2112            update,
2113        )))) = event
2114        {
2115            update.terminal
2116        } else {
2117            panic!("Expected terminal but got: {:?}", event);
2118        }
2119    }
2120}
2121
2122#[cfg(test)]
2123impl std::ops::Deref for ToolCallEventStreamReceiver {
2124    type Target = mpsc::UnboundedReceiver<Result<ThreadEvent>>;
2125
2126    fn deref(&self) -> &Self::Target {
2127        &self.0
2128    }
2129}
2130
2131#[cfg(test)]
2132impl std::ops::DerefMut for ToolCallEventStreamReceiver {
2133    fn deref_mut(&mut self) -> &mut Self::Target {
2134        &mut self.0
2135    }
2136}
2137
2138impl From<&str> for UserMessageContent {
2139    fn from(text: &str) -> Self {
2140        Self::Text(text.into())
2141    }
2142}
2143
2144impl From<acp::ContentBlock> for UserMessageContent {
2145    fn from(value: acp::ContentBlock) -> Self {
2146        match value {
2147            acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
2148            acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
2149            acp::ContentBlock::Audio(_) => {
2150                // TODO
2151                Self::Text("[audio]".to_string())
2152            }
2153            acp::ContentBlock::ResourceLink(resource_link) => {
2154                match MentionUri::parse(&resource_link.uri) {
2155                    Ok(uri) => Self::Mention {
2156                        uri,
2157                        content: String::new(),
2158                    },
2159                    Err(err) => {
2160                        log::error!("Failed to parse mention link: {}", err);
2161                        Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
2162                    }
2163                }
2164            }
2165            acp::ContentBlock::Resource(resource) => match resource.resource {
2166                acp::EmbeddedResourceResource::TextResourceContents(resource) => {
2167                    match MentionUri::parse(&resource.uri) {
2168                        Ok(uri) => Self::Mention {
2169                            uri,
2170                            content: resource.text,
2171                        },
2172                        Err(err) => {
2173                            log::error!("Failed to parse mention link: {}", err);
2174                            Self::Text(
2175                                MarkdownCodeBlock {
2176                                    tag: &resource.uri,
2177                                    text: &resource.text,
2178                                }
2179                                .to_string(),
2180                            )
2181                        }
2182                    }
2183                }
2184                acp::EmbeddedResourceResource::BlobResourceContents(_) => {
2185                    // TODO
2186                    Self::Text("[blob]".to_string())
2187                }
2188            },
2189        }
2190    }
2191}
2192
2193impl From<UserMessageContent> for acp::ContentBlock {
2194    fn from(content: UserMessageContent) -> Self {
2195        match content {
2196            UserMessageContent::Text(text) => acp::ContentBlock::Text(acp::TextContent {
2197                text,
2198                annotations: None,
2199            }),
2200            UserMessageContent::Image(image) => acp::ContentBlock::Image(acp::ImageContent {
2201                data: image.source.to_string(),
2202                mime_type: "image/png".to_string(),
2203                annotations: None,
2204                uri: None,
2205            }),
2206            UserMessageContent::Mention { uri, content } => {
2207                acp::ContentBlock::ResourceLink(acp::ResourceLink {
2208                    uri: uri.to_uri().to_string(),
2209                    name: uri.name(),
2210                    annotations: None,
2211                    description: if content.is_empty() {
2212                        None
2213                    } else {
2214                        Some(content)
2215                    },
2216                    mime_type: None,
2217                    size: None,
2218                    title: None,
2219                })
2220            }
2221        }
2222    }
2223}
2224
2225fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
2226    LanguageModelImage {
2227        source: image_content.data.into(),
2228        // TODO: make this optional?
2229        size: gpui::Size::new(0.into(), 0.into()),
2230    }
2231}