acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use agent_settings::AgentSettings;
   7use collections::HashSet;
   8pub use connection::*;
   9pub use diff::*;
  10use language::language_settings::FormatOnSave;
  11pub use mention::*;
  12use project::lsp_store::{FormatTrigger, LspFormatTarget};
  13use serde::{Deserialize, Serialize};
  14use settings::Settings as _;
  15use task::{Shell, ShellBuilder};
  16pub use terminal::*;
  17
  18use action_log::ActionLog;
  19use agent_client_protocol::{self as acp};
  20use anyhow::{Context as _, Result, anyhow};
  21use editor::Bias;
  22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  24use itertools::Itertools;
  25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  26use markdown::Markdown;
  27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  28use std::collections::HashMap;
  29use std::error::Error;
  30use std::fmt::{Formatter, Write};
  31use std::ops::Range;
  32use std::process::ExitStatus;
  33use std::rc::Rc;
  34use std::time::{Duration, Instant};
  35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  36use ui::App;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40#[derive(Debug)]
  41pub struct UserMessage {
  42    pub id: Option<UserMessageId>,
  43    pub content: ContentBlock,
  44    pub chunks: Vec<acp::ContentBlock>,
  45    pub checkpoint: Option<Checkpoint>,
  46}
  47
  48#[derive(Debug)]
  49pub struct Checkpoint {
  50    git_checkpoint: GitStoreCheckpoint,
  51    pub show: bool,
  52}
  53
  54impl UserMessage {
  55    fn to_markdown(&self, cx: &App) -> String {
  56        let mut markdown = String::new();
  57        if self
  58            .checkpoint
  59            .as_ref()
  60            .is_some_and(|checkpoint| checkpoint.show)
  61        {
  62            writeln!(markdown, "## User (checkpoint)").unwrap();
  63        } else {
  64            writeln!(markdown, "## User").unwrap();
  65        }
  66        writeln!(markdown).unwrap();
  67        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  68        writeln!(markdown).unwrap();
  69        markdown
  70    }
  71}
  72
  73#[derive(Debug, PartialEq)]
  74pub struct AssistantMessage {
  75    pub chunks: Vec<AssistantMessageChunk>,
  76}
  77
  78impl AssistantMessage {
  79    pub fn to_markdown(&self, cx: &App) -> String {
  80        format!(
  81            "## Assistant\n\n{}\n\n",
  82            self.chunks
  83                .iter()
  84                .map(|chunk| chunk.to_markdown(cx))
  85                .join("\n\n")
  86        )
  87    }
  88}
  89
  90#[derive(Debug, PartialEq)]
  91pub enum AssistantMessageChunk {
  92    Message { block: ContentBlock },
  93    Thought { block: ContentBlock },
  94}
  95
  96impl AssistantMessageChunk {
  97    pub fn from_str(
  98        chunk: &str,
  99        language_registry: &Arc<LanguageRegistry>,
 100        path_style: PathStyle,
 101        cx: &mut App,
 102    ) -> Self {
 103        Self::Message {
 104            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 105        }
 106    }
 107
 108    fn to_markdown(&self, cx: &App) -> String {
 109        match self {
 110            Self::Message { block } => block.to_markdown(cx).to_string(),
 111            Self::Thought { block } => {
 112                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 113            }
 114        }
 115    }
 116}
 117
 118#[derive(Debug)]
 119pub enum AgentThreadEntry {
 120    UserMessage(UserMessage),
 121    AssistantMessage(AssistantMessage),
 122    ToolCall(ToolCall),
 123}
 124
 125impl AgentThreadEntry {
 126    pub fn to_markdown(&self, cx: &App) -> String {
 127        match self {
 128            Self::UserMessage(message) => message.to_markdown(cx),
 129            Self::AssistantMessage(message) => message.to_markdown(cx),
 130            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 131        }
 132    }
 133
 134    pub fn user_message(&self) -> Option<&UserMessage> {
 135        if let AgentThreadEntry::UserMessage(message) = self {
 136            Some(message)
 137        } else {
 138            None
 139        }
 140    }
 141
 142    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 143        if let AgentThreadEntry::ToolCall(call) = self {
 144            itertools::Either::Left(call.diffs())
 145        } else {
 146            itertools::Either::Right(std::iter::empty())
 147        }
 148    }
 149
 150    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 151        if let AgentThreadEntry::ToolCall(call) = self {
 152            itertools::Either::Left(call.terminals())
 153        } else {
 154            itertools::Either::Right(std::iter::empty())
 155        }
 156    }
 157
 158    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 159        if let AgentThreadEntry::ToolCall(ToolCall {
 160            locations,
 161            resolved_locations,
 162            ..
 163        }) = self
 164        {
 165            Some((
 166                locations.get(ix)?.clone(),
 167                resolved_locations.get(ix)?.clone()?,
 168            ))
 169        } else {
 170            None
 171        }
 172    }
 173}
 174
 175#[derive(Debug)]
 176pub struct ToolCall {
 177    pub id: acp::ToolCallId,
 178    pub label: Entity<Markdown>,
 179    pub kind: acp::ToolKind,
 180    pub content: Vec<ToolCallContent>,
 181    pub status: ToolCallStatus,
 182    pub locations: Vec<acp::ToolCallLocation>,
 183    pub resolved_locations: Vec<Option<AgentLocation>>,
 184    pub raw_input: Option<serde_json::Value>,
 185    pub raw_output: Option<serde_json::Value>,
 186}
 187
 188impl ToolCall {
 189    fn from_acp(
 190        tool_call: acp::ToolCall,
 191        status: ToolCallStatus,
 192        language_registry: Arc<LanguageRegistry>,
 193        path_style: PathStyle,
 194        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 195        cx: &mut App,
 196    ) -> Result<Self> {
 197        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 198            first_line.to_owned() + ""
 199        } else {
 200            tool_call.title
 201        };
 202        let mut content = Vec::with_capacity(tool_call.content.len());
 203        for item in tool_call.content {
 204            content.push(ToolCallContent::from_acp(
 205                item,
 206                language_registry.clone(),
 207                path_style,
 208                terminals,
 209                cx,
 210            )?);
 211        }
 212
 213        let result = Self {
 214            id: tool_call.id,
 215            label: cx
 216                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 217            kind: tool_call.kind,
 218            content,
 219            locations: tool_call.locations,
 220            resolved_locations: Vec::default(),
 221            status,
 222            raw_input: tool_call.raw_input,
 223            raw_output: tool_call.raw_output,
 224        };
 225        Ok(result)
 226    }
 227
 228    fn update_fields(
 229        &mut self,
 230        fields: acp::ToolCallUpdateFields,
 231        language_registry: Arc<LanguageRegistry>,
 232        path_style: PathStyle,
 233        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 234        cx: &mut App,
 235    ) -> Result<()> {
 236        let acp::ToolCallUpdateFields {
 237            kind,
 238            status,
 239            title,
 240            content,
 241            locations,
 242            raw_input,
 243            raw_output,
 244        } = fields;
 245
 246        if let Some(kind) = kind {
 247            self.kind = kind;
 248        }
 249
 250        if let Some(status) = status {
 251            self.status = status.into();
 252        }
 253
 254        if let Some(title) = title {
 255            self.label.update(cx, |label, cx| {
 256                if let Some((first_line, _)) = title.split_once("\n") {
 257                    label.replace(first_line.to_owned() + "", cx)
 258                } else {
 259                    label.replace(title, cx);
 260                }
 261            });
 262        }
 263
 264        if let Some(content) = content {
 265            let new_content_len = content.len();
 266            let mut content = content.into_iter();
 267
 268            // Reuse existing content if we can
 269            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 270                old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 271            }
 272            for new in content {
 273                self.content.push(ToolCallContent::from_acp(
 274                    new,
 275                    language_registry.clone(),
 276                    path_style,
 277                    terminals,
 278                    cx,
 279                )?)
 280            }
 281            self.content.truncate(new_content_len);
 282        }
 283
 284        if let Some(locations) = locations {
 285            self.locations = locations;
 286        }
 287
 288        if let Some(raw_input) = raw_input {
 289            self.raw_input = Some(raw_input);
 290        }
 291
 292        if let Some(raw_output) = raw_output {
 293            if self.content.is_empty()
 294                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 295            {
 296                self.content
 297                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 298                        markdown,
 299                    }));
 300            }
 301            self.raw_output = Some(raw_output);
 302        }
 303        Ok(())
 304    }
 305
 306    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 307        self.content.iter().filter_map(|content| match content {
 308            ToolCallContent::Diff(diff) => Some(diff),
 309            ToolCallContent::ContentBlock(_) => None,
 310            ToolCallContent::Terminal(_) => None,
 311        })
 312    }
 313
 314    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 315        self.content.iter().filter_map(|content| match content {
 316            ToolCallContent::Terminal(terminal) => Some(terminal),
 317            ToolCallContent::ContentBlock(_) => None,
 318            ToolCallContent::Diff(_) => None,
 319        })
 320    }
 321
 322    fn to_markdown(&self, cx: &App) -> String {
 323        let mut markdown = format!(
 324            "**Tool Call: {}**\nStatus: {}\n\n",
 325            self.label.read(cx).source(),
 326            self.status
 327        );
 328        for content in &self.content {
 329            markdown.push_str(content.to_markdown(cx).as_str());
 330            markdown.push_str("\n\n");
 331        }
 332        markdown
 333    }
 334
 335    async fn resolve_location(
 336        location: acp::ToolCallLocation,
 337        project: WeakEntity<Project>,
 338        cx: &mut AsyncApp,
 339    ) -> Option<ResolvedLocation> {
 340        let buffer = project
 341            .update(cx, |project, cx| {
 342                project
 343                    .project_path_for_absolute_path(&location.path, cx)
 344                    .map(|path| project.open_buffer(path, cx))
 345            })
 346            .ok()??;
 347        let buffer = buffer.await.log_err()?;
 348        let position = buffer
 349            .update(cx, |buffer, _| {
 350                if let Some(row) = location.line {
 351                    let snapshot = buffer.snapshot();
 352                    let column = snapshot.indent_size_for_line(row).len;
 353                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 354                    snapshot.anchor_before(point)
 355                } else {
 356                    Anchor::MIN
 357                }
 358            })
 359            .ok()?;
 360
 361        Some(ResolvedLocation { buffer, position })
 362    }
 363
 364    fn resolve_locations(
 365        &self,
 366        project: Entity<Project>,
 367        cx: &mut App,
 368    ) -> Task<Vec<Option<ResolvedLocation>>> {
 369        let locations = self.locations.clone();
 370        project.update(cx, |_, cx| {
 371            cx.spawn(async move |project, cx| {
 372                let mut new_locations = Vec::new();
 373                for location in locations {
 374                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 375                }
 376                new_locations
 377            })
 378        })
 379    }
 380}
 381
 382// Separate so we can hold a strong reference to the buffer
 383// for saving on the thread
 384#[derive(Clone, Debug, PartialEq, Eq)]
 385struct ResolvedLocation {
 386    buffer: Entity<Buffer>,
 387    position: Anchor,
 388}
 389
 390impl From<&ResolvedLocation> for AgentLocation {
 391    fn from(value: &ResolvedLocation) -> Self {
 392        Self {
 393            buffer: value.buffer.downgrade(),
 394            position: value.position,
 395        }
 396    }
 397}
 398
 399#[derive(Debug)]
 400pub enum ToolCallStatus {
 401    /// The tool call hasn't started running yet, but we start showing it to
 402    /// the user.
 403    Pending,
 404    /// The tool call is waiting for confirmation from the user.
 405    WaitingForConfirmation {
 406        options: Vec<acp::PermissionOption>,
 407        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 408    },
 409    /// The tool call is currently running.
 410    InProgress,
 411    /// The tool call completed successfully.
 412    Completed,
 413    /// The tool call failed.
 414    Failed,
 415    /// The user rejected the tool call.
 416    Rejected,
 417    /// The user canceled generation so the tool call was canceled.
 418    Canceled,
 419}
 420
 421impl From<acp::ToolCallStatus> for ToolCallStatus {
 422    fn from(status: acp::ToolCallStatus) -> Self {
 423        match status {
 424            acp::ToolCallStatus::Pending => Self::Pending,
 425            acp::ToolCallStatus::InProgress => Self::InProgress,
 426            acp::ToolCallStatus::Completed => Self::Completed,
 427            acp::ToolCallStatus::Failed => Self::Failed,
 428        }
 429    }
 430}
 431
 432impl Display for ToolCallStatus {
 433    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 434        write!(
 435            f,
 436            "{}",
 437            match self {
 438                ToolCallStatus::Pending => "Pending",
 439                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 440                ToolCallStatus::InProgress => "In Progress",
 441                ToolCallStatus::Completed => "Completed",
 442                ToolCallStatus::Failed => "Failed",
 443                ToolCallStatus::Rejected => "Rejected",
 444                ToolCallStatus::Canceled => "Canceled",
 445            }
 446        )
 447    }
 448}
 449
 450#[derive(Debug, PartialEq, Clone)]
 451pub enum ContentBlock {
 452    Empty,
 453    Markdown { markdown: Entity<Markdown> },
 454    ResourceLink { resource_link: acp::ResourceLink },
 455}
 456
 457impl ContentBlock {
 458    pub fn new(
 459        block: acp::ContentBlock,
 460        language_registry: &Arc<LanguageRegistry>,
 461        path_style: PathStyle,
 462        cx: &mut App,
 463    ) -> Self {
 464        let mut this = Self::Empty;
 465        this.append(block, language_registry, path_style, cx);
 466        this
 467    }
 468
 469    pub fn new_combined(
 470        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 471        language_registry: Arc<LanguageRegistry>,
 472        path_style: PathStyle,
 473        cx: &mut App,
 474    ) -> Self {
 475        let mut this = Self::Empty;
 476        for block in blocks {
 477            this.append(block, &language_registry, path_style, cx);
 478        }
 479        this
 480    }
 481
 482    pub fn append(
 483        &mut self,
 484        block: acp::ContentBlock,
 485        language_registry: &Arc<LanguageRegistry>,
 486        path_style: PathStyle,
 487        cx: &mut App,
 488    ) {
 489        if matches!(self, ContentBlock::Empty)
 490            && let acp::ContentBlock::ResourceLink(resource_link) = block
 491        {
 492            *self = ContentBlock::ResourceLink { resource_link };
 493            return;
 494        }
 495
 496        let new_content = self.block_string_contents(block, path_style);
 497
 498        match self {
 499            ContentBlock::Empty => {
 500                *self = Self::create_markdown_block(new_content, language_registry, cx);
 501            }
 502            ContentBlock::Markdown { markdown } => {
 503                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 504            }
 505            ContentBlock::ResourceLink { resource_link } => {
 506                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 507                let combined = format!("{}\n{}", existing_content, new_content);
 508
 509                *self = Self::create_markdown_block(combined, language_registry, cx);
 510            }
 511        }
 512    }
 513
 514    fn create_markdown_block(
 515        content: String,
 516        language_registry: &Arc<LanguageRegistry>,
 517        cx: &mut App,
 518    ) -> ContentBlock {
 519        ContentBlock::Markdown {
 520            markdown: cx
 521                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 522        }
 523    }
 524
 525    fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
 526        match block {
 527            acp::ContentBlock::Text(text_content) => text_content.text,
 528            acp::ContentBlock::ResourceLink(resource_link) => {
 529                Self::resource_link_md(&resource_link.uri, path_style)
 530            }
 531            acp::ContentBlock::Resource(acp::EmbeddedResource {
 532                resource:
 533                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 534                        uri,
 535                        ..
 536                    }),
 537                ..
 538            }) => Self::resource_link_md(&uri, path_style),
 539            acp::ContentBlock::Image(image) => Self::image_md(&image),
 540            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 541        }
 542    }
 543
 544    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 545        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 546            uri.as_link().to_string()
 547        } else {
 548            uri.to_string()
 549        }
 550    }
 551
 552    fn image_md(_image: &acp::ImageContent) -> String {
 553        "`Image`".into()
 554    }
 555
 556    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 557        match self {
 558            ContentBlock::Empty => "",
 559            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 560            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 561        }
 562    }
 563
 564    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 565        match self {
 566            ContentBlock::Empty => None,
 567            ContentBlock::Markdown { markdown } => Some(markdown),
 568            ContentBlock::ResourceLink { .. } => None,
 569        }
 570    }
 571
 572    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 573        match self {
 574            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 575            _ => None,
 576        }
 577    }
 578}
 579
 580#[derive(Debug)]
 581pub enum ToolCallContent {
 582    ContentBlock(ContentBlock),
 583    Diff(Entity<Diff>),
 584    Terminal(Entity<Terminal>),
 585}
 586
 587impl ToolCallContent {
 588    pub fn from_acp(
 589        content: acp::ToolCallContent,
 590        language_registry: Arc<LanguageRegistry>,
 591        path_style: PathStyle,
 592        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 593        cx: &mut App,
 594    ) -> Result<Self> {
 595        match content {
 596            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 597                content,
 598                &language_registry,
 599                path_style,
 600                cx,
 601            ))),
 602            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 603                Diff::finalized(
 604                    diff.path.to_string_lossy().into_owned(),
 605                    diff.old_text,
 606                    diff.new_text,
 607                    language_registry,
 608                    cx,
 609                )
 610            }))),
 611            acp::ToolCallContent::Terminal { terminal_id } => terminals
 612                .get(&terminal_id)
 613                .cloned()
 614                .map(Self::Terminal)
 615                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 616        }
 617    }
 618
 619    pub fn update_from_acp(
 620        &mut self,
 621        new: acp::ToolCallContent,
 622        language_registry: Arc<LanguageRegistry>,
 623        path_style: PathStyle,
 624        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 625        cx: &mut App,
 626    ) -> Result<()> {
 627        let needs_update = match (&self, &new) {
 628            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 629                old_diff.read(cx).needs_update(
 630                    new_diff.old_text.as_deref().unwrap_or(""),
 631                    &new_diff.new_text,
 632                    cx,
 633                )
 634            }
 635            _ => true,
 636        };
 637
 638        if needs_update {
 639            *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
 640        }
 641        Ok(())
 642    }
 643
 644    pub fn to_markdown(&self, cx: &App) -> String {
 645        match self {
 646            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 647            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 648            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 649        }
 650    }
 651}
 652
 653#[derive(Debug, PartialEq)]
 654pub enum ToolCallUpdate {
 655    UpdateFields(acp::ToolCallUpdate),
 656    UpdateDiff(ToolCallUpdateDiff),
 657    UpdateTerminal(ToolCallUpdateTerminal),
 658}
 659
 660impl ToolCallUpdate {
 661    fn id(&self) -> &acp::ToolCallId {
 662        match self {
 663            Self::UpdateFields(update) => &update.id,
 664            Self::UpdateDiff(diff) => &diff.id,
 665            Self::UpdateTerminal(terminal) => &terminal.id,
 666        }
 667    }
 668}
 669
 670impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 671    fn from(update: acp::ToolCallUpdate) -> Self {
 672        Self::UpdateFields(update)
 673    }
 674}
 675
 676impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 677    fn from(diff: ToolCallUpdateDiff) -> Self {
 678        Self::UpdateDiff(diff)
 679    }
 680}
 681
 682#[derive(Debug, PartialEq)]
 683pub struct ToolCallUpdateDiff {
 684    pub id: acp::ToolCallId,
 685    pub diff: Entity<Diff>,
 686}
 687
 688impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 689    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 690        Self::UpdateTerminal(terminal)
 691    }
 692}
 693
 694#[derive(Debug, PartialEq)]
 695pub struct ToolCallUpdateTerminal {
 696    pub id: acp::ToolCallId,
 697    pub terminal: Entity<Terminal>,
 698}
 699
 700#[derive(Debug, Default)]
 701pub struct Plan {
 702    pub entries: Vec<PlanEntry>,
 703}
 704
 705#[derive(Debug)]
 706pub struct PlanStats<'a> {
 707    pub in_progress_entry: Option<&'a PlanEntry>,
 708    pub pending: u32,
 709    pub completed: u32,
 710}
 711
 712impl Plan {
 713    pub fn is_empty(&self) -> bool {
 714        self.entries.is_empty()
 715    }
 716
 717    pub fn stats(&self) -> PlanStats<'_> {
 718        let mut stats = PlanStats {
 719            in_progress_entry: None,
 720            pending: 0,
 721            completed: 0,
 722        };
 723
 724        for entry in &self.entries {
 725            match &entry.status {
 726                acp::PlanEntryStatus::Pending => {
 727                    stats.pending += 1;
 728                }
 729                acp::PlanEntryStatus::InProgress => {
 730                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 731                }
 732                acp::PlanEntryStatus::Completed => {
 733                    stats.completed += 1;
 734                }
 735            }
 736        }
 737
 738        stats
 739    }
 740}
 741
 742#[derive(Debug)]
 743pub struct PlanEntry {
 744    pub content: Entity<Markdown>,
 745    pub priority: acp::PlanEntryPriority,
 746    pub status: acp::PlanEntryStatus,
 747}
 748
 749impl PlanEntry {
 750    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 751        Self {
 752            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 753            priority: entry.priority,
 754            status: entry.status,
 755        }
 756    }
 757}
 758
 759#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 760pub struct TokenUsage {
 761    pub max_tokens: u64,
 762    pub used_tokens: u64,
 763}
 764
 765impl TokenUsage {
 766    pub fn ratio(&self) -> TokenUsageRatio {
 767        #[cfg(debug_assertions)]
 768        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 769            .unwrap_or("0.8".to_string())
 770            .parse()
 771            .unwrap();
 772        #[cfg(not(debug_assertions))]
 773        let warning_threshold: f32 = 0.8;
 774
 775        // When the maximum is unknown because there is no selected model,
 776        // avoid showing the token limit warning.
 777        if self.max_tokens == 0 {
 778            TokenUsageRatio::Normal
 779        } else if self.used_tokens >= self.max_tokens {
 780            TokenUsageRatio::Exceeded
 781        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 782            TokenUsageRatio::Warning
 783        } else {
 784            TokenUsageRatio::Normal
 785        }
 786    }
 787}
 788
 789#[derive(Debug, Clone, PartialEq, Eq)]
 790pub enum TokenUsageRatio {
 791    Normal,
 792    Warning,
 793    Exceeded,
 794}
 795
 796#[derive(Debug, Clone)]
 797pub struct RetryStatus {
 798    pub last_error: SharedString,
 799    pub attempt: usize,
 800    pub max_attempts: usize,
 801    pub started_at: Instant,
 802    pub duration: Duration,
 803}
 804
 805pub struct AcpThread {
 806    title: SharedString,
 807    entries: Vec<AgentThreadEntry>,
 808    plan: Plan,
 809    project: Entity<Project>,
 810    action_log: Entity<ActionLog>,
 811    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 812    send_task: Option<Task<()>>,
 813    connection: Rc<dyn AgentConnection>,
 814    session_id: acp::SessionId,
 815    token_usage: Option<TokenUsage>,
 816    prompt_capabilities: acp::PromptCapabilities,
 817    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 818    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 819    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 820    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 821}
 822
 823#[derive(Debug)]
 824pub enum AcpThreadEvent {
 825    NewEntry,
 826    TitleUpdated,
 827    TokenUsageUpdated,
 828    EntryUpdated(usize),
 829    EntriesRemoved(Range<usize>),
 830    ToolAuthorizationRequired,
 831    Retry(RetryStatus),
 832    Stopped,
 833    Error,
 834    LoadError(LoadError),
 835    PromptCapabilitiesUpdated,
 836    Refusal,
 837    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 838    ModeUpdated(acp::SessionModeId),
 839}
 840
 841impl EventEmitter<AcpThreadEvent> for AcpThread {}
 842
 843#[derive(Debug, Clone)]
 844pub enum TerminalProviderEvent {
 845    Created {
 846        terminal_id: acp::TerminalId,
 847        label: String,
 848        cwd: Option<PathBuf>,
 849        output_byte_limit: Option<u64>,
 850        terminal: Entity<::terminal::Terminal>,
 851    },
 852    Output {
 853        terminal_id: acp::TerminalId,
 854        data: Vec<u8>,
 855    },
 856    TitleChanged {
 857        terminal_id: acp::TerminalId,
 858        title: String,
 859    },
 860    Exit {
 861        terminal_id: acp::TerminalId,
 862        status: acp::TerminalExitStatus,
 863    },
 864}
 865
 866#[derive(Debug, Clone)]
 867pub enum TerminalProviderCommand {
 868    WriteInput {
 869        terminal_id: acp::TerminalId,
 870        bytes: Vec<u8>,
 871    },
 872    Resize {
 873        terminal_id: acp::TerminalId,
 874        cols: u16,
 875        rows: u16,
 876    },
 877    Close {
 878        terminal_id: acp::TerminalId,
 879    },
 880}
 881
 882impl AcpThread {
 883    pub fn on_terminal_provider_event(
 884        &mut self,
 885        event: TerminalProviderEvent,
 886        cx: &mut Context<Self>,
 887    ) {
 888        match event {
 889            TerminalProviderEvent::Created {
 890                terminal_id,
 891                label,
 892                cwd,
 893                output_byte_limit,
 894                terminal,
 895            } => {
 896                let entity = self.register_terminal_created(
 897                    terminal_id.clone(),
 898                    label,
 899                    cwd,
 900                    output_byte_limit,
 901                    terminal,
 902                    cx,
 903                );
 904
 905                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 906                    for data in chunks.drain(..) {
 907                        entity.update(cx, |term, cx| {
 908                            term.inner().update(cx, |inner, cx| {
 909                                inner.write_output(&data, cx);
 910                            })
 911                        });
 912                    }
 913                }
 914
 915                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 916                    entity.update(cx, |_term, cx| {
 917                        cx.notify();
 918                    });
 919                }
 920
 921                cx.notify();
 922            }
 923            TerminalProviderEvent::Output { terminal_id, data } => {
 924                if let Some(entity) = self.terminals.get(&terminal_id) {
 925                    entity.update(cx, |term, cx| {
 926                        term.inner().update(cx, |inner, cx| {
 927                            inner.write_output(&data, cx);
 928                        })
 929                    });
 930                } else {
 931                    self.pending_terminal_output
 932                        .entry(terminal_id)
 933                        .or_default()
 934                        .push(data);
 935                }
 936            }
 937            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 938                if let Some(entity) = self.terminals.get(&terminal_id) {
 939                    entity.update(cx, |term, cx| {
 940                        term.inner().update(cx, |inner, cx| {
 941                            inner.breadcrumb_text = title;
 942                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 943                        })
 944                    });
 945                }
 946            }
 947            TerminalProviderEvent::Exit {
 948                terminal_id,
 949                status,
 950            } => {
 951                if let Some(entity) = self.terminals.get(&terminal_id) {
 952                    entity.update(cx, |_term, cx| {
 953                        cx.notify();
 954                    });
 955                } else {
 956                    self.pending_terminal_exit.insert(terminal_id, status);
 957                }
 958            }
 959        }
 960    }
 961}
 962
 963#[derive(PartialEq, Eq, Debug)]
 964pub enum ThreadStatus {
 965    Idle,
 966    Generating,
 967}
 968
 969#[derive(Debug, Clone)]
 970pub enum LoadError {
 971    Unsupported {
 972        command: SharedString,
 973        current_version: SharedString,
 974        minimum_version: SharedString,
 975    },
 976    FailedToInstall(SharedString),
 977    Exited {
 978        status: ExitStatus,
 979    },
 980    Other(SharedString),
 981}
 982
 983impl Display for LoadError {
 984    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 985        match self {
 986            LoadError::Unsupported {
 987                command: path,
 988                current_version,
 989                minimum_version,
 990            } => {
 991                write!(
 992                    f,
 993                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
 994                )
 995            }
 996            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
 997            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
 998            LoadError::Other(msg) => write!(f, "{msg}"),
 999        }
1000    }
1001}
1002
1003impl Error for LoadError {}
1004
1005impl AcpThread {
1006    pub fn new(
1007        title: impl Into<SharedString>,
1008        connection: Rc<dyn AgentConnection>,
1009        project: Entity<Project>,
1010        action_log: Entity<ActionLog>,
1011        session_id: acp::SessionId,
1012        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1013        cx: &mut Context<Self>,
1014    ) -> Self {
1015        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1016        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1017            loop {
1018                let caps = prompt_capabilities_rx.recv().await?;
1019                this.update(cx, |this, cx| {
1020                    this.prompt_capabilities = caps;
1021                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1022                })?;
1023            }
1024        });
1025
1026        Self {
1027            action_log,
1028            shared_buffers: Default::default(),
1029            entries: Default::default(),
1030            plan: Default::default(),
1031            title: title.into(),
1032            project,
1033            send_task: None,
1034            connection,
1035            session_id,
1036            token_usage: None,
1037            prompt_capabilities,
1038            _observe_prompt_capabilities: task,
1039            terminals: HashMap::default(),
1040            pending_terminal_output: HashMap::default(),
1041            pending_terminal_exit: HashMap::default(),
1042        }
1043    }
1044
1045    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1046        self.prompt_capabilities.clone()
1047    }
1048
1049    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1050        &self.connection
1051    }
1052
1053    pub fn action_log(&self) -> &Entity<ActionLog> {
1054        &self.action_log
1055    }
1056
1057    pub fn project(&self) -> &Entity<Project> {
1058        &self.project
1059    }
1060
1061    pub fn title(&self) -> SharedString {
1062        self.title.clone()
1063    }
1064
1065    pub fn entries(&self) -> &[AgentThreadEntry] {
1066        &self.entries
1067    }
1068
1069    pub fn session_id(&self) -> &acp::SessionId {
1070        &self.session_id
1071    }
1072
1073    pub fn status(&self) -> ThreadStatus {
1074        if self.send_task.is_some() {
1075            ThreadStatus::Generating
1076        } else {
1077            ThreadStatus::Idle
1078        }
1079    }
1080
1081    pub fn token_usage(&self) -> Option<&TokenUsage> {
1082        self.token_usage.as_ref()
1083    }
1084
1085    pub fn has_pending_edit_tool_calls(&self) -> bool {
1086        for entry in self.entries.iter().rev() {
1087            match entry {
1088                AgentThreadEntry::UserMessage(_) => return false,
1089                AgentThreadEntry::ToolCall(
1090                    call @ ToolCall {
1091                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1092                        ..
1093                    },
1094                ) if call.diffs().next().is_some() => {
1095                    return true;
1096                }
1097                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1098            }
1099        }
1100
1101        false
1102    }
1103
1104    pub fn used_tools_since_last_user_message(&self) -> bool {
1105        for entry in self.entries.iter().rev() {
1106            match entry {
1107                AgentThreadEntry::UserMessage(..) => return false,
1108                AgentThreadEntry::AssistantMessage(..) => continue,
1109                AgentThreadEntry::ToolCall(..) => return true,
1110            }
1111        }
1112
1113        false
1114    }
1115
1116    pub fn handle_session_update(
1117        &mut self,
1118        update: acp::SessionUpdate,
1119        cx: &mut Context<Self>,
1120    ) -> Result<(), acp::Error> {
1121        match update {
1122            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1123                self.push_user_content_block(None, content, cx);
1124            }
1125            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1126                self.push_assistant_content_block(content, false, cx);
1127            }
1128            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1129                self.push_assistant_content_block(content, true, cx);
1130            }
1131            acp::SessionUpdate::ToolCall(tool_call) => {
1132                self.upsert_tool_call(tool_call, cx)?;
1133            }
1134            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1135                self.update_tool_call(tool_call_update, cx)?;
1136            }
1137            acp::SessionUpdate::Plan(plan) => {
1138                self.update_plan(plan, cx);
1139            }
1140            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1141                available_commands,
1142                ..
1143            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1144            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1145                current_mode_id,
1146                ..
1147            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1148        }
1149        Ok(())
1150    }
1151
1152    pub fn push_user_content_block(
1153        &mut self,
1154        message_id: Option<UserMessageId>,
1155        chunk: acp::ContentBlock,
1156        cx: &mut Context<Self>,
1157    ) {
1158        let language_registry = self.project.read(cx).languages().clone();
1159        let path_style = self.project.read(cx).path_style(cx);
1160        let entries_len = self.entries.len();
1161
1162        if let Some(last_entry) = self.entries.last_mut()
1163            && let AgentThreadEntry::UserMessage(UserMessage {
1164                id,
1165                content,
1166                chunks,
1167                ..
1168            }) = last_entry
1169        {
1170            *id = message_id.or(id.take());
1171            content.append(chunk.clone(), &language_registry, path_style, cx);
1172            chunks.push(chunk);
1173            let idx = entries_len - 1;
1174            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1175        } else {
1176            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1177            self.push_entry(
1178                AgentThreadEntry::UserMessage(UserMessage {
1179                    id: message_id,
1180                    content,
1181                    chunks: vec![chunk],
1182                    checkpoint: None,
1183                }),
1184                cx,
1185            );
1186        }
1187    }
1188
1189    pub fn push_assistant_content_block(
1190        &mut self,
1191        chunk: acp::ContentBlock,
1192        is_thought: bool,
1193        cx: &mut Context<Self>,
1194    ) {
1195        let language_registry = self.project.read(cx).languages().clone();
1196        let path_style = self.project.read(cx).path_style(cx);
1197        let entries_len = self.entries.len();
1198        if let Some(last_entry) = self.entries.last_mut()
1199            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1200        {
1201            let idx = entries_len - 1;
1202            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1203            match (chunks.last_mut(), is_thought) {
1204                (Some(AssistantMessageChunk::Message { block }), false)
1205                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1206                    block.append(chunk, &language_registry, path_style, cx)
1207                }
1208                _ => {
1209                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1210                    if is_thought {
1211                        chunks.push(AssistantMessageChunk::Thought { block })
1212                    } else {
1213                        chunks.push(AssistantMessageChunk::Message { block })
1214                    }
1215                }
1216            }
1217        } else {
1218            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1219            let chunk = if is_thought {
1220                AssistantMessageChunk::Thought { block }
1221            } else {
1222                AssistantMessageChunk::Message { block }
1223            };
1224
1225            self.push_entry(
1226                AgentThreadEntry::AssistantMessage(AssistantMessage {
1227                    chunks: vec![chunk],
1228                }),
1229                cx,
1230            );
1231        }
1232    }
1233
1234    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1235        self.entries.push(entry);
1236        cx.emit(AcpThreadEvent::NewEntry);
1237    }
1238
1239    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1240        self.connection.set_title(&self.session_id, cx).is_some()
1241    }
1242
1243    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1244        if title != self.title {
1245            self.title = title.clone();
1246            cx.emit(AcpThreadEvent::TitleUpdated);
1247            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1248                return set_title.run(title, cx);
1249            }
1250        }
1251        Task::ready(Ok(()))
1252    }
1253
1254    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1255        self.token_usage = usage;
1256        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1257    }
1258
1259    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1260        cx.emit(AcpThreadEvent::Retry(status));
1261    }
1262
1263    pub fn update_tool_call(
1264        &mut self,
1265        update: impl Into<ToolCallUpdate>,
1266        cx: &mut Context<Self>,
1267    ) -> Result<()> {
1268        let update = update.into();
1269        let languages = self.project.read(cx).languages().clone();
1270        let path_style = self.project.read(cx).path_style(cx);
1271
1272        let ix = match self.index_for_tool_call(update.id()) {
1273            Some(ix) => ix,
1274            None => {
1275                // Tool call not found - create a failed tool call entry
1276                let failed_tool_call = ToolCall {
1277                    id: update.id().clone(),
1278                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1279                    kind: acp::ToolKind::Fetch,
1280                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1281                        acp::ContentBlock::Text(acp::TextContent {
1282                            text: "Tool call not found".to_string(),
1283                            annotations: None,
1284                            meta: None,
1285                        }),
1286                        &languages,
1287                        path_style,
1288                        cx,
1289                    ))],
1290                    status: ToolCallStatus::Failed,
1291                    locations: Vec::new(),
1292                    resolved_locations: Vec::new(),
1293                    raw_input: None,
1294                    raw_output: None,
1295                };
1296                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1297                return Ok(());
1298            }
1299        };
1300        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1301            unreachable!()
1302        };
1303
1304        match update {
1305            ToolCallUpdate::UpdateFields(update) => {
1306                let location_updated = update.fields.locations.is_some();
1307                call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1308                if location_updated {
1309                    self.resolve_locations(update.id, cx);
1310                }
1311            }
1312            ToolCallUpdate::UpdateDiff(update) => {
1313                call.content.clear();
1314                call.content.push(ToolCallContent::Diff(update.diff));
1315            }
1316            ToolCallUpdate::UpdateTerminal(update) => {
1317                call.content.clear();
1318                call.content
1319                    .push(ToolCallContent::Terminal(update.terminal));
1320            }
1321        }
1322
1323        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1324
1325        Ok(())
1326    }
1327
1328    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1329    pub fn upsert_tool_call(
1330        &mut self,
1331        tool_call: acp::ToolCall,
1332        cx: &mut Context<Self>,
1333    ) -> Result<(), acp::Error> {
1334        let status = tool_call.status.into();
1335        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1336    }
1337
1338    /// Fails if id does not match an existing entry.
1339    pub fn upsert_tool_call_inner(
1340        &mut self,
1341        update: acp::ToolCallUpdate,
1342        status: ToolCallStatus,
1343        cx: &mut Context<Self>,
1344    ) -> Result<(), acp::Error> {
1345        let language_registry = self.project.read(cx).languages().clone();
1346        let path_style = self.project.read(cx).path_style(cx);
1347        let id = update.id.clone();
1348
1349        if let Some(ix) = self.index_for_tool_call(&id) {
1350            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1351                unreachable!()
1352            };
1353
1354            call.update_fields(
1355                update.fields,
1356                language_registry,
1357                path_style,
1358                &self.terminals,
1359                cx,
1360            )?;
1361            call.status = status;
1362
1363            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1364        } else {
1365            let call = ToolCall::from_acp(
1366                update.try_into()?,
1367                status,
1368                language_registry,
1369                self.project.read(cx).path_style(cx),
1370                &self.terminals,
1371                cx,
1372            )?;
1373            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1374        };
1375
1376        self.resolve_locations(id, cx);
1377        Ok(())
1378    }
1379
1380    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1381        self.entries
1382            .iter()
1383            .enumerate()
1384            .rev()
1385            .find_map(|(index, entry)| {
1386                if let AgentThreadEntry::ToolCall(tool_call) = entry
1387                    && &tool_call.id == id
1388                {
1389                    Some(index)
1390                } else {
1391                    None
1392                }
1393            })
1394    }
1395
1396    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1397        // The tool call we are looking for is typically the last one, or very close to the end.
1398        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1399        self.entries
1400            .iter_mut()
1401            .enumerate()
1402            .rev()
1403            .find_map(|(index, tool_call)| {
1404                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1405                    && &tool_call.id == id
1406                {
1407                    Some((index, tool_call))
1408                } else {
1409                    None
1410                }
1411            })
1412    }
1413
1414    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1415        self.entries
1416            .iter()
1417            .enumerate()
1418            .rev()
1419            .find_map(|(index, tool_call)| {
1420                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1421                    && &tool_call.id == id
1422                {
1423                    Some((index, tool_call))
1424                } else {
1425                    None
1426                }
1427            })
1428    }
1429
1430    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1431        let project = self.project.clone();
1432        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1433            return;
1434        };
1435        let task = tool_call.resolve_locations(project, cx);
1436        cx.spawn(async move |this, cx| {
1437            let resolved_locations = task.await;
1438
1439            this.update(cx, |this, cx| {
1440                let project = this.project.clone();
1441
1442                for location in resolved_locations.iter().flatten() {
1443                    this.shared_buffers
1444                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1445                }
1446                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1447                    return;
1448                };
1449
1450                if let Some(Some(location)) = resolved_locations.last() {
1451                    project.update(cx, |project, cx| {
1452                        let should_ignore = if let Some(agent_location) = project
1453                            .agent_location()
1454                            .filter(|agent_location| agent_location.buffer == location.buffer)
1455                        {
1456                            let snapshot = location.buffer.read(cx).snapshot();
1457                            let old_position = agent_location.position.to_point(&snapshot);
1458                            let new_position = location.position.to_point(&snapshot);
1459
1460                            // ignore this so that when we get updates from the edit tool
1461                            // the position doesn't reset to the startof line
1462                            old_position.row == new_position.row
1463                                && old_position.column > new_position.column
1464                        } else {
1465                            false
1466                        };
1467                        if !should_ignore {
1468                            project.set_agent_location(Some(location.into()), cx);
1469                        }
1470                    });
1471                }
1472
1473                let resolved_locations = resolved_locations
1474                    .iter()
1475                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1476                    .collect::<Vec<_>>();
1477
1478                if tool_call.resolved_locations != resolved_locations {
1479                    tool_call.resolved_locations = resolved_locations;
1480                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1481                }
1482            })
1483        })
1484        .detach();
1485    }
1486
1487    pub fn request_tool_call_authorization(
1488        &mut self,
1489        tool_call: acp::ToolCallUpdate,
1490        options: Vec<acp::PermissionOption>,
1491        respect_always_allow_setting: bool,
1492        cx: &mut Context<Self>,
1493    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1494        let (tx, rx) = oneshot::channel();
1495
1496        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1497            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1498            // some tools would (incorrectly) continue to auto-accept.
1499            if let Some(allow_once_option) = options.iter().find_map(|option| {
1500                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1501                    Some(option.id.clone())
1502                } else {
1503                    None
1504                }
1505            }) {
1506                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1507                return Ok(async {
1508                    acp::RequestPermissionOutcome::Selected {
1509                        option_id: allow_once_option,
1510                    }
1511                }
1512                .boxed());
1513            }
1514        }
1515
1516        let status = ToolCallStatus::WaitingForConfirmation {
1517            options,
1518            respond_tx: tx,
1519        };
1520
1521        self.upsert_tool_call_inner(tool_call, status, cx)?;
1522        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1523
1524        let fut = async {
1525            match rx.await {
1526                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1527                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1528            }
1529        }
1530        .boxed();
1531
1532        Ok(fut)
1533    }
1534
1535    pub fn authorize_tool_call(
1536        &mut self,
1537        id: acp::ToolCallId,
1538        option_id: acp::PermissionOptionId,
1539        option_kind: acp::PermissionOptionKind,
1540        cx: &mut Context<Self>,
1541    ) {
1542        let Some((ix, call)) = self.tool_call_mut(&id) else {
1543            return;
1544        };
1545
1546        let new_status = match option_kind {
1547            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1548                ToolCallStatus::Rejected
1549            }
1550            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1551                ToolCallStatus::InProgress
1552            }
1553        };
1554
1555        let curr_status = mem::replace(&mut call.status, new_status);
1556
1557        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1558            respond_tx.send(option_id).log_err();
1559        } else if cfg!(debug_assertions) {
1560            panic!("tried to authorize an already authorized tool call");
1561        }
1562
1563        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1564    }
1565
1566    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1567        let mut first_tool_call = None;
1568
1569        for entry in self.entries.iter().rev() {
1570            match &entry {
1571                AgentThreadEntry::ToolCall(call) => {
1572                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1573                        first_tool_call = Some(call);
1574                    } else {
1575                        continue;
1576                    }
1577                }
1578                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1579                    // Reached the beginning of the turn.
1580                    // If we had pending permission requests in the previous turn, they have been cancelled.
1581                    break;
1582                }
1583            }
1584        }
1585
1586        first_tool_call
1587    }
1588
1589    pub fn plan(&self) -> &Plan {
1590        &self.plan
1591    }
1592
1593    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1594        let new_entries_len = request.entries.len();
1595        let mut new_entries = request.entries.into_iter();
1596
1597        // Reuse existing markdown to prevent flickering
1598        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1599            let PlanEntry {
1600                content,
1601                priority,
1602                status,
1603            } = old;
1604            content.update(cx, |old, cx| {
1605                old.replace(new.content, cx);
1606            });
1607            *priority = new.priority;
1608            *status = new.status;
1609        }
1610        for new in new_entries {
1611            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1612        }
1613        self.plan.entries.truncate(new_entries_len);
1614
1615        cx.notify();
1616    }
1617
1618    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1619        self.plan
1620            .entries
1621            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1622        cx.notify();
1623    }
1624
1625    #[cfg(any(test, feature = "test-support"))]
1626    pub fn send_raw(
1627        &mut self,
1628        message: &str,
1629        cx: &mut Context<Self>,
1630    ) -> BoxFuture<'static, Result<()>> {
1631        self.send(
1632            vec![acp::ContentBlock::Text(acp::TextContent {
1633                text: message.to_string(),
1634                annotations: None,
1635                meta: None,
1636            })],
1637            cx,
1638        )
1639    }
1640
1641    pub fn send(
1642        &mut self,
1643        message: Vec<acp::ContentBlock>,
1644        cx: &mut Context<Self>,
1645    ) -> BoxFuture<'static, Result<()>> {
1646        let block = ContentBlock::new_combined(
1647            message.clone(),
1648            self.project.read(cx).languages().clone(),
1649            self.project.read(cx).path_style(cx),
1650            cx,
1651        );
1652        let request = acp::PromptRequest {
1653            prompt: message.clone(),
1654            session_id: self.session_id.clone(),
1655            meta: None,
1656        };
1657        let git_store = self.project.read(cx).git_store().clone();
1658
1659        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1660            Some(UserMessageId::new())
1661        } else {
1662            None
1663        };
1664
1665        self.run_turn(cx, async move |this, cx| {
1666            this.update(cx, |this, cx| {
1667                this.push_entry(
1668                    AgentThreadEntry::UserMessage(UserMessage {
1669                        id: message_id.clone(),
1670                        content: block,
1671                        chunks: message,
1672                        checkpoint: None,
1673                    }),
1674                    cx,
1675                );
1676            })
1677            .ok();
1678
1679            let old_checkpoint = git_store
1680                .update(cx, |git, cx| git.checkpoint(cx))?
1681                .await
1682                .context("failed to get old checkpoint")
1683                .log_err();
1684            this.update(cx, |this, cx| {
1685                if let Some((_ix, message)) = this.last_user_message() {
1686                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1687                        git_checkpoint,
1688                        show: false,
1689                    });
1690                }
1691                this.connection.prompt(message_id, request, cx)
1692            })?
1693            .await
1694        })
1695    }
1696
1697    pub fn can_resume(&self, cx: &App) -> bool {
1698        self.connection.resume(&self.session_id, cx).is_some()
1699    }
1700
1701    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1702        self.run_turn(cx, async move |this, cx| {
1703            this.update(cx, |this, cx| {
1704                this.connection
1705                    .resume(&this.session_id, cx)
1706                    .map(|resume| resume.run(cx))
1707            })?
1708            .context("resuming a session is not supported")?
1709            .await
1710        })
1711    }
1712
1713    fn run_turn(
1714        &mut self,
1715        cx: &mut Context<Self>,
1716        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1717    ) -> BoxFuture<'static, Result<()>> {
1718        self.clear_completed_plan_entries(cx);
1719
1720        let (tx, rx) = oneshot::channel();
1721        let cancel_task = self.cancel(cx);
1722
1723        self.send_task = Some(cx.spawn(async move |this, cx| {
1724            cancel_task.await;
1725            tx.send(f(this, cx).await).ok();
1726        }));
1727
1728        cx.spawn(async move |this, cx| {
1729            let response = rx.await;
1730
1731            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1732                .await?;
1733
1734            this.update(cx, |this, cx| {
1735                this.project
1736                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1737                match response {
1738                    Ok(Err(e)) => {
1739                        this.send_task.take();
1740                        cx.emit(AcpThreadEvent::Error);
1741                        Err(e)
1742                    }
1743                    result => {
1744                        let canceled = matches!(
1745                            result,
1746                            Ok(Ok(acp::PromptResponse {
1747                                stop_reason: acp::StopReason::Cancelled,
1748                                meta: None,
1749                            }))
1750                        );
1751
1752                        // We only take the task if the current prompt wasn't canceled.
1753                        //
1754                        // This prompt may have been canceled because another one was sent
1755                        // while it was still generating. In these cases, dropping `send_task`
1756                        // would cause the next generation to be canceled.
1757                        if !canceled {
1758                            this.send_task.take();
1759                        }
1760
1761                        // Handle refusal - distinguish between user prompt and tool call refusals
1762                        if let Ok(Ok(acp::PromptResponse {
1763                            stop_reason: acp::StopReason::Refusal,
1764                            meta: _,
1765                        })) = result
1766                        {
1767                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1768                                // Check if there's a completed tool call with results after the last user message
1769                                // This indicates the refusal is in response to tool output, not the user's prompt
1770                                let has_completed_tool_call_after_user_msg =
1771                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1772                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1773                                            // Check if the tool call has completed and has output
1774                                            matches!(tool_call.status, ToolCallStatus::Completed)
1775                                                && tool_call.raw_output.is_some()
1776                                        } else {
1777                                            false
1778                                        }
1779                                    });
1780
1781                                if has_completed_tool_call_after_user_msg {
1782                                    // Refusal is due to tool output - don't truncate, just notify
1783                                    // The model refused based on what the tool returned
1784                                    cx.emit(AcpThreadEvent::Refusal);
1785                                } else {
1786                                    // User prompt was refused - truncate back to before the user message
1787                                    let range = user_msg_ix..this.entries.len();
1788                                    if range.start < range.end {
1789                                        this.entries.truncate(user_msg_ix);
1790                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1791                                    }
1792                                    cx.emit(AcpThreadEvent::Refusal);
1793                                }
1794                            } else {
1795                                // No user message found, treat as general refusal
1796                                cx.emit(AcpThreadEvent::Refusal);
1797                            }
1798                        }
1799
1800                        cx.emit(AcpThreadEvent::Stopped);
1801                        Ok(())
1802                    }
1803                }
1804            })?
1805        })
1806        .boxed()
1807    }
1808
1809    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1810        let Some(send_task) = self.send_task.take() else {
1811            return Task::ready(());
1812        };
1813
1814        for entry in self.entries.iter_mut() {
1815            if let AgentThreadEntry::ToolCall(call) = entry {
1816                let cancel = matches!(
1817                    call.status,
1818                    ToolCallStatus::Pending
1819                        | ToolCallStatus::WaitingForConfirmation { .. }
1820                        | ToolCallStatus::InProgress
1821                );
1822
1823                if cancel {
1824                    call.status = ToolCallStatus::Canceled;
1825                }
1826            }
1827        }
1828
1829        self.connection.cancel(&self.session_id, cx);
1830
1831        // Wait for the send task to complete
1832        cx.foreground_executor().spawn(send_task)
1833    }
1834
1835    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1836    pub fn restore_checkpoint(
1837        &mut self,
1838        id: UserMessageId,
1839        cx: &mut Context<Self>,
1840    ) -> Task<Result<()>> {
1841        let Some((_, message)) = self.user_message_mut(&id) else {
1842            return Task::ready(Err(anyhow!("message not found")));
1843        };
1844
1845        let checkpoint = message
1846            .checkpoint
1847            .as_ref()
1848            .map(|c| c.git_checkpoint.clone());
1849        let rewind = self.rewind(id.clone(), cx);
1850        let git_store = self.project.read(cx).git_store().clone();
1851
1852        cx.spawn(async move |_, cx| {
1853            rewind.await?;
1854            if let Some(checkpoint) = checkpoint {
1855                git_store
1856                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1857                    .await?;
1858            }
1859
1860            Ok(())
1861        })
1862    }
1863
1864    /// Rewinds this thread to before the entry at `index`, removing it and all
1865    /// subsequent entries while rejecting any action_log changes made from that point.
1866    /// Unlike `restore_checkpoint`, this method does not restore from git.
1867    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1868        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1869            return Task::ready(Err(anyhow!("not supported")));
1870        };
1871
1872        cx.spawn(async move |this, cx| {
1873            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1874            this.update(cx, |this, cx| {
1875                if let Some((ix, _)) = this.user_message_mut(&id) {
1876                    let range = ix..this.entries.len();
1877                    this.entries.truncate(ix);
1878                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1879                }
1880                this.action_log()
1881                    .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1882            })?
1883            .await;
1884            Ok(())
1885        })
1886    }
1887
1888    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1889        let git_store = self.project.read(cx).git_store().clone();
1890
1891        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1892            if let Some(checkpoint) = message.checkpoint.as_ref() {
1893                checkpoint.git_checkpoint.clone()
1894            } else {
1895                return Task::ready(Ok(()));
1896            }
1897        } else {
1898            return Task::ready(Ok(()));
1899        };
1900
1901        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1902        cx.spawn(async move |this, cx| {
1903            let new_checkpoint = new_checkpoint
1904                .await
1905                .context("failed to get new checkpoint")
1906                .log_err();
1907            if let Some(new_checkpoint) = new_checkpoint {
1908                let equal = git_store
1909                    .update(cx, |git, cx| {
1910                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1911                    })?
1912                    .await
1913                    .unwrap_or(true);
1914                this.update(cx, |this, cx| {
1915                    let (ix, message) = this.last_user_message().context("no user message")?;
1916                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1917                    checkpoint.show = !equal;
1918                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1919                    anyhow::Ok(())
1920                })??;
1921            }
1922
1923            Ok(())
1924        })
1925    }
1926
1927    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1928        self.entries
1929            .iter_mut()
1930            .enumerate()
1931            .rev()
1932            .find_map(|(ix, entry)| {
1933                if let AgentThreadEntry::UserMessage(message) = entry {
1934                    Some((ix, message))
1935                } else {
1936                    None
1937                }
1938            })
1939    }
1940
1941    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1942        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1943            if let AgentThreadEntry::UserMessage(message) = entry {
1944                if message.id.as_ref() == Some(id) {
1945                    Some((ix, message))
1946                } else {
1947                    None
1948                }
1949            } else {
1950                None
1951            }
1952        })
1953    }
1954
1955    pub fn read_text_file(
1956        &self,
1957        path: PathBuf,
1958        line: Option<u32>,
1959        limit: Option<u32>,
1960        reuse_shared_snapshot: bool,
1961        cx: &mut Context<Self>,
1962    ) -> Task<Result<String, acp::Error>> {
1963        // Args are 1-based, move to 0-based
1964        let line = line.unwrap_or_default().saturating_sub(1);
1965        let limit = limit.unwrap_or(u32::MAX);
1966        let project = self.project.clone();
1967        let action_log = self.action_log.clone();
1968        cx.spawn(async move |this, cx| {
1969            let load = project
1970                .update(cx, |project, cx| {
1971                    let path = project
1972                        .project_path_for_absolute_path(&path, cx)
1973                        .ok_or_else(|| {
1974                            acp::Error::resource_not_found(Some(path.display().to_string()))
1975                        })?;
1976                    Ok(project.open_buffer(path, cx))
1977                })
1978                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1979                .flatten()?;
1980
1981            let buffer = load.await?;
1982
1983            let snapshot = if reuse_shared_snapshot {
1984                this.read_with(cx, |this, _| {
1985                    this.shared_buffers.get(&buffer.clone()).cloned()
1986                })
1987                .log_err()
1988                .flatten()
1989            } else {
1990                None
1991            };
1992
1993            let snapshot = if let Some(snapshot) = snapshot {
1994                snapshot
1995            } else {
1996                action_log.update(cx, |action_log, cx| {
1997                    action_log.buffer_read(buffer.clone(), cx);
1998                })?;
1999
2000                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2001                this.update(cx, |this, _| {
2002                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2003                })?;
2004                snapshot
2005            };
2006
2007            let max_point = snapshot.max_point();
2008            let start_position = Point::new(line, 0);
2009
2010            if start_position > max_point {
2011                return Err(acp::Error::invalid_params().with_data(format!(
2012                    "Attempting to read beyond the end of the file, line {}:{}",
2013                    max_point.row + 1,
2014                    max_point.column
2015                )));
2016            }
2017
2018            let start = snapshot.anchor_before(start_position);
2019            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2020
2021            project.update(cx, |project, cx| {
2022                project.set_agent_location(
2023                    Some(AgentLocation {
2024                        buffer: buffer.downgrade(),
2025                        position: start,
2026                    }),
2027                    cx,
2028                );
2029            })?;
2030
2031            Ok(snapshot.text_for_range(start..end).collect::<String>())
2032        })
2033    }
2034
2035    pub fn write_text_file(
2036        &self,
2037        path: PathBuf,
2038        content: String,
2039        cx: &mut Context<Self>,
2040    ) -> Task<Result<()>> {
2041        let project = self.project.clone();
2042        let action_log = self.action_log.clone();
2043        cx.spawn(async move |this, cx| {
2044            let load = project.update(cx, |project, cx| {
2045                let path = project
2046                    .project_path_for_absolute_path(&path, cx)
2047                    .context("invalid path")?;
2048                anyhow::Ok(project.open_buffer(path, cx))
2049            });
2050            let buffer = load??.await?;
2051            let snapshot = this.update(cx, |this, cx| {
2052                this.shared_buffers
2053                    .get(&buffer)
2054                    .cloned()
2055                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2056            })?;
2057            let edits = cx
2058                .background_executor()
2059                .spawn(async move {
2060                    let old_text = snapshot.text();
2061                    text_diff(old_text.as_str(), &content)
2062                        .into_iter()
2063                        .map(|(range, replacement)| {
2064                            (
2065                                snapshot.anchor_after(range.start)
2066                                    ..snapshot.anchor_before(range.end),
2067                                replacement,
2068                            )
2069                        })
2070                        .collect::<Vec<_>>()
2071                })
2072                .await;
2073
2074            project.update(cx, |project, cx| {
2075                project.set_agent_location(
2076                    Some(AgentLocation {
2077                        buffer: buffer.downgrade(),
2078                        position: edits
2079                            .last()
2080                            .map(|(range, _)| range.end)
2081                            .unwrap_or(Anchor::MIN),
2082                    }),
2083                    cx,
2084                );
2085            })?;
2086
2087            let format_on_save = cx.update(|cx| {
2088                action_log.update(cx, |action_log, cx| {
2089                    action_log.buffer_read(buffer.clone(), cx);
2090                });
2091
2092                let format_on_save = buffer.update(cx, |buffer, cx| {
2093                    buffer.edit(edits, None, cx);
2094
2095                    let settings = language::language_settings::language_settings(
2096                        buffer.language().map(|l| l.name()),
2097                        buffer.file(),
2098                        cx,
2099                    );
2100
2101                    settings.format_on_save != FormatOnSave::Off
2102                });
2103                action_log.update(cx, |action_log, cx| {
2104                    action_log.buffer_edited(buffer.clone(), cx);
2105                });
2106                format_on_save
2107            })?;
2108
2109            if format_on_save {
2110                let format_task = project.update(cx, |project, cx| {
2111                    project.format(
2112                        HashSet::from_iter([buffer.clone()]),
2113                        LspFormatTarget::Buffers,
2114                        false,
2115                        FormatTrigger::Save,
2116                        cx,
2117                    )
2118                })?;
2119                format_task.await.log_err();
2120
2121                action_log.update(cx, |action_log, cx| {
2122                    action_log.buffer_edited(buffer.clone(), cx);
2123                })?;
2124            }
2125
2126            project
2127                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2128                .await
2129        })
2130    }
2131
2132    pub fn create_terminal(
2133        &self,
2134        command: String,
2135        args: Vec<String>,
2136        extra_env: Vec<acp::EnvVariable>,
2137        cwd: Option<PathBuf>,
2138        output_byte_limit: Option<u64>,
2139        cx: &mut Context<Self>,
2140    ) -> Task<Result<Entity<Terminal>>> {
2141        let env = match &cwd {
2142            Some(dir) => self.project.update(cx, |project, cx| {
2143                project.environment().update(cx, |env, cx| {
2144                    env.directory_environment(dir.as_path().into(), cx)
2145                })
2146            }),
2147            None => Task::ready(None).shared(),
2148        };
2149        let env = cx.spawn(async move |_, _| {
2150            let mut env = env.await.unwrap_or_default();
2151            // Disables paging for `git` and hopefully other commands
2152            env.insert("PAGER".into(), "".into());
2153            for var in extra_env {
2154                env.insert(var.name, var.value);
2155            }
2156            env
2157        });
2158
2159        let project = self.project.clone();
2160        let language_registry = project.read(cx).languages().clone();
2161        let is_windows = project.read(cx).path_style(cx).is_windows();
2162
2163        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2164        let terminal_task = cx.spawn({
2165            let terminal_id = terminal_id.clone();
2166            async move |_this, cx| {
2167                let env = env.await;
2168                let shell = project
2169                    .update(cx, |project, cx| {
2170                        project
2171                            .remote_client()
2172                            .and_then(|r| r.read(cx).default_system_shell())
2173                    })?
2174                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2175                let (task_command, task_args) =
2176                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2177                        .redirect_stdin_to_dev_null()
2178                        .build(Some(command.clone()), &args);
2179                let terminal = project
2180                    .update(cx, |project, cx| {
2181                        project.create_terminal_task(
2182                            task::SpawnInTerminal {
2183                                command: Some(task_command),
2184                                args: task_args,
2185                                cwd: cwd.clone(),
2186                                env,
2187                                ..Default::default()
2188                            },
2189                            cx,
2190                        )
2191                    })?
2192                    .await?;
2193
2194                cx.new(|cx| {
2195                    Terminal::new(
2196                        terminal_id,
2197                        &format!("{} {}", command, args.join(" ")),
2198                        cwd,
2199                        output_byte_limit.map(|l| l as usize),
2200                        terminal,
2201                        language_registry,
2202                        cx,
2203                    )
2204                })
2205            }
2206        });
2207
2208        cx.spawn(async move |this, cx| {
2209            let terminal = terminal_task.await?;
2210            this.update(cx, |this, _cx| {
2211                this.terminals.insert(terminal_id, terminal.clone());
2212                terminal
2213            })
2214        })
2215    }
2216
2217    pub fn kill_terminal(
2218        &mut self,
2219        terminal_id: acp::TerminalId,
2220        cx: &mut Context<Self>,
2221    ) -> Result<()> {
2222        self.terminals
2223            .get(&terminal_id)
2224            .context("Terminal not found")?
2225            .update(cx, |terminal, cx| {
2226                terminal.kill(cx);
2227            });
2228
2229        Ok(())
2230    }
2231
2232    pub fn release_terminal(
2233        &mut self,
2234        terminal_id: acp::TerminalId,
2235        cx: &mut Context<Self>,
2236    ) -> Result<()> {
2237        self.terminals
2238            .remove(&terminal_id)
2239            .context("Terminal not found")?
2240            .update(cx, |terminal, cx| {
2241                terminal.kill(cx);
2242            });
2243
2244        Ok(())
2245    }
2246
2247    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2248        self.terminals
2249            .get(&terminal_id)
2250            .context("Terminal not found")
2251            .cloned()
2252    }
2253
2254    pub fn to_markdown(&self, cx: &App) -> String {
2255        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2256    }
2257
2258    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2259        cx.emit(AcpThreadEvent::LoadError(error));
2260    }
2261
2262    pub fn register_terminal_created(
2263        &mut self,
2264        terminal_id: acp::TerminalId,
2265        command_label: String,
2266        working_dir: Option<PathBuf>,
2267        output_byte_limit: Option<u64>,
2268        terminal: Entity<::terminal::Terminal>,
2269        cx: &mut Context<Self>,
2270    ) -> Entity<Terminal> {
2271        let language_registry = self.project.read(cx).languages().clone();
2272
2273        let entity = cx.new(|cx| {
2274            Terminal::new(
2275                terminal_id.clone(),
2276                &command_label,
2277                working_dir.clone(),
2278                output_byte_limit.map(|l| l as usize),
2279                terminal,
2280                language_registry,
2281                cx,
2282            )
2283        });
2284        self.terminals.insert(terminal_id.clone(), entity.clone());
2285        entity
2286    }
2287}
2288
2289fn markdown_for_raw_output(
2290    raw_output: &serde_json::Value,
2291    language_registry: &Arc<LanguageRegistry>,
2292    cx: &mut App,
2293) -> Option<Entity<Markdown>> {
2294    match raw_output {
2295        serde_json::Value::Null => None,
2296        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2297            Markdown::new(
2298                value.to_string().into(),
2299                Some(language_registry.clone()),
2300                None,
2301                cx,
2302            )
2303        })),
2304        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2305            Markdown::new(
2306                value.to_string().into(),
2307                Some(language_registry.clone()),
2308                None,
2309                cx,
2310            )
2311        })),
2312        serde_json::Value::String(value) => Some(cx.new(|cx| {
2313            Markdown::new(
2314                value.clone().into(),
2315                Some(language_registry.clone()),
2316                None,
2317                cx,
2318            )
2319        })),
2320        value => Some(cx.new(|cx| {
2321            Markdown::new(
2322                format!("```json\n{}\n```", value).into(),
2323                Some(language_registry.clone()),
2324                None,
2325                cx,
2326            )
2327        })),
2328    }
2329}
2330
2331#[cfg(test)]
2332mod tests {
2333    use super::*;
2334    use anyhow::anyhow;
2335    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2336    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2337    use indoc::indoc;
2338    use project::{FakeFs, Fs};
2339    use rand::{distr, prelude::*};
2340    use serde_json::json;
2341    use settings::SettingsStore;
2342    use smol::stream::StreamExt as _;
2343    use std::{
2344        any::Any,
2345        cell::RefCell,
2346        path::Path,
2347        rc::Rc,
2348        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2349        time::Duration,
2350    };
2351    use util::path;
2352
2353    fn init_test(cx: &mut TestAppContext) {
2354        env_logger::try_init().ok();
2355        cx.update(|cx| {
2356            let settings_store = SettingsStore::test(cx);
2357            cx.set_global(settings_store);
2358            Project::init_settings(cx);
2359            language::init(cx);
2360        });
2361    }
2362
2363    #[gpui::test]
2364    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2365        init_test(cx);
2366
2367        let fs = FakeFs::new(cx.executor());
2368        let project = Project::test(fs, [], cx).await;
2369        let connection = Rc::new(FakeAgentConnection::new());
2370        let thread = cx
2371            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2372            .await
2373            .unwrap();
2374
2375        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2376
2377        // Send Output BEFORE Created - should be buffered by acp_thread
2378        thread.update(cx, |thread, cx| {
2379            thread.on_terminal_provider_event(
2380                TerminalProviderEvent::Output {
2381                    terminal_id: terminal_id.clone(),
2382                    data: b"hello buffered".to_vec(),
2383                },
2384                cx,
2385            );
2386        });
2387
2388        // Create a display-only terminal and then send Created
2389        let lower = cx.new(|cx| {
2390            let builder = ::terminal::TerminalBuilder::new_display_only(
2391                ::terminal::terminal_settings::CursorShape::default(),
2392                ::terminal::terminal_settings::AlternateScroll::On,
2393                None,
2394                0,
2395            )
2396            .unwrap();
2397            builder.subscribe(cx)
2398        });
2399
2400        thread.update(cx, |thread, cx| {
2401            thread.on_terminal_provider_event(
2402                TerminalProviderEvent::Created {
2403                    terminal_id: terminal_id.clone(),
2404                    label: "Buffered Test".to_string(),
2405                    cwd: None,
2406                    output_byte_limit: None,
2407                    terminal: lower.clone(),
2408                },
2409                cx,
2410            );
2411        });
2412
2413        // After Created, buffered Output should have been flushed into the renderer
2414        let content = thread.read_with(cx, |thread, cx| {
2415            let term = thread.terminal(terminal_id.clone()).unwrap();
2416            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2417        });
2418
2419        assert!(
2420            content.contains("hello buffered"),
2421            "expected buffered output to render, got: {content}"
2422        );
2423    }
2424
2425    #[gpui::test]
2426    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2427        init_test(cx);
2428
2429        let fs = FakeFs::new(cx.executor());
2430        let project = Project::test(fs, [], cx).await;
2431        let connection = Rc::new(FakeAgentConnection::new());
2432        let thread = cx
2433            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2434            .await
2435            .unwrap();
2436
2437        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2438
2439        // Send Output BEFORE Created
2440        thread.update(cx, |thread, cx| {
2441            thread.on_terminal_provider_event(
2442                TerminalProviderEvent::Output {
2443                    terminal_id: terminal_id.clone(),
2444                    data: b"pre-exit data".to_vec(),
2445                },
2446                cx,
2447            );
2448        });
2449
2450        // Send Exit BEFORE Created
2451        thread.update(cx, |thread, cx| {
2452            thread.on_terminal_provider_event(
2453                TerminalProviderEvent::Exit {
2454                    terminal_id: terminal_id.clone(),
2455                    status: acp::TerminalExitStatus {
2456                        exit_code: Some(0),
2457                        signal: None,
2458                        meta: None,
2459                    },
2460                },
2461                cx,
2462            );
2463        });
2464
2465        // Now create a display-only lower-level terminal and send Created
2466        let lower = cx.new(|cx| {
2467            let builder = ::terminal::TerminalBuilder::new_display_only(
2468                ::terminal::terminal_settings::CursorShape::default(),
2469                ::terminal::terminal_settings::AlternateScroll::On,
2470                None,
2471                0,
2472            )
2473            .unwrap();
2474            builder.subscribe(cx)
2475        });
2476
2477        thread.update(cx, |thread, cx| {
2478            thread.on_terminal_provider_event(
2479                TerminalProviderEvent::Created {
2480                    terminal_id: terminal_id.clone(),
2481                    label: "Buffered Exit Test".to_string(),
2482                    cwd: None,
2483                    output_byte_limit: None,
2484                    terminal: lower.clone(),
2485                },
2486                cx,
2487            );
2488        });
2489
2490        // Output should be present after Created (flushed from buffer)
2491        let content = thread.read_with(cx, |thread, cx| {
2492            let term = thread.terminal(terminal_id.clone()).unwrap();
2493            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2494        });
2495
2496        assert!(
2497            content.contains("pre-exit data"),
2498            "expected pre-exit data to render, got: {content}"
2499        );
2500    }
2501
2502    #[gpui::test]
2503    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2504        init_test(cx);
2505
2506        let fs = FakeFs::new(cx.executor());
2507        let project = Project::test(fs, [], cx).await;
2508        let connection = Rc::new(FakeAgentConnection::new());
2509        let thread = cx
2510            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2511            .await
2512            .unwrap();
2513
2514        // Test creating a new user message
2515        thread.update(cx, |thread, cx| {
2516            thread.push_user_content_block(
2517                None,
2518                acp::ContentBlock::Text(acp::TextContent {
2519                    annotations: None,
2520                    text: "Hello, ".to_string(),
2521                    meta: None,
2522                }),
2523                cx,
2524            );
2525        });
2526
2527        thread.update(cx, |thread, cx| {
2528            assert_eq!(thread.entries.len(), 1);
2529            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2530                assert_eq!(user_msg.id, None);
2531                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2532            } else {
2533                panic!("Expected UserMessage");
2534            }
2535        });
2536
2537        // Test appending to existing user message
2538        let message_1_id = UserMessageId::new();
2539        thread.update(cx, |thread, cx| {
2540            thread.push_user_content_block(
2541                Some(message_1_id.clone()),
2542                acp::ContentBlock::Text(acp::TextContent {
2543                    annotations: None,
2544                    text: "world!".to_string(),
2545                    meta: None,
2546                }),
2547                cx,
2548            );
2549        });
2550
2551        thread.update(cx, |thread, cx| {
2552            assert_eq!(thread.entries.len(), 1);
2553            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2554                assert_eq!(user_msg.id, Some(message_1_id));
2555                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2556            } else {
2557                panic!("Expected UserMessage");
2558            }
2559        });
2560
2561        // Test creating new user message after assistant message
2562        thread.update(cx, |thread, cx| {
2563            thread.push_assistant_content_block(
2564                acp::ContentBlock::Text(acp::TextContent {
2565                    annotations: None,
2566                    text: "Assistant response".to_string(),
2567                    meta: None,
2568                }),
2569                false,
2570                cx,
2571            );
2572        });
2573
2574        let message_2_id = UserMessageId::new();
2575        thread.update(cx, |thread, cx| {
2576            thread.push_user_content_block(
2577                Some(message_2_id.clone()),
2578                acp::ContentBlock::Text(acp::TextContent {
2579                    annotations: None,
2580                    text: "New user message".to_string(),
2581                    meta: None,
2582                }),
2583                cx,
2584            );
2585        });
2586
2587        thread.update(cx, |thread, cx| {
2588            assert_eq!(thread.entries.len(), 3);
2589            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2590                assert_eq!(user_msg.id, Some(message_2_id));
2591                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2592            } else {
2593                panic!("Expected UserMessage at index 2");
2594            }
2595        });
2596    }
2597
2598    #[gpui::test]
2599    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2600        init_test(cx);
2601
2602        let fs = FakeFs::new(cx.executor());
2603        let project = Project::test(fs, [], cx).await;
2604        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2605            |_, thread, mut cx| {
2606                async move {
2607                    thread.update(&mut cx, |thread, cx| {
2608                        thread
2609                            .handle_session_update(
2610                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2611                                    content: "Thinking ".into(),
2612                                    meta: None,
2613                                }),
2614                                cx,
2615                            )
2616                            .unwrap();
2617                        thread
2618                            .handle_session_update(
2619                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2620                                    content: "hard!".into(),
2621                                    meta: None,
2622                                }),
2623                                cx,
2624                            )
2625                            .unwrap();
2626                    })?;
2627                    Ok(acp::PromptResponse {
2628                        stop_reason: acp::StopReason::EndTurn,
2629                        meta: None,
2630                    })
2631                }
2632                .boxed_local()
2633            },
2634        ));
2635
2636        let thread = cx
2637            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2638            .await
2639            .unwrap();
2640
2641        thread
2642            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2643            .await
2644            .unwrap();
2645
2646        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2647        assert_eq!(
2648            output,
2649            indoc! {r#"
2650            ## User
2651
2652            Hello from Zed!
2653
2654            ## Assistant
2655
2656            <thinking>
2657            Thinking hard!
2658            </thinking>
2659
2660            "#}
2661        );
2662    }
2663
2664    #[gpui::test]
2665    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2666        init_test(cx);
2667
2668        let fs = FakeFs::new(cx.executor());
2669        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2670            .await;
2671        let project = Project::test(fs.clone(), [], cx).await;
2672        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2673        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2674        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2675            move |_, thread, mut cx| {
2676                let read_file_tx = read_file_tx.clone();
2677                async move {
2678                    let content = thread
2679                        .update(&mut cx, |thread, cx| {
2680                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2681                        })
2682                        .unwrap()
2683                        .await
2684                        .unwrap();
2685                    assert_eq!(content, "one\ntwo\nthree\n");
2686                    read_file_tx.take().unwrap().send(()).unwrap();
2687                    thread
2688                        .update(&mut cx, |thread, cx| {
2689                            thread.write_text_file(
2690                                path!("/tmp/foo").into(),
2691                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2692                                cx,
2693                            )
2694                        })
2695                        .unwrap()
2696                        .await
2697                        .unwrap();
2698                    Ok(acp::PromptResponse {
2699                        stop_reason: acp::StopReason::EndTurn,
2700                        meta: None,
2701                    })
2702                }
2703                .boxed_local()
2704            },
2705        ));
2706
2707        let (worktree, pathbuf) = project
2708            .update(cx, |project, cx| {
2709                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2710            })
2711            .await
2712            .unwrap();
2713        let buffer = project
2714            .update(cx, |project, cx| {
2715                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2716            })
2717            .await
2718            .unwrap();
2719
2720        let thread = cx
2721            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2722            .await
2723            .unwrap();
2724
2725        let request = thread.update(cx, |thread, cx| {
2726            thread.send_raw("Extend the count in /tmp/foo", cx)
2727        });
2728        read_file_rx.await.ok();
2729        buffer.update(cx, |buffer, cx| {
2730            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2731        });
2732        cx.run_until_parked();
2733        assert_eq!(
2734            buffer.read_with(cx, |buffer, _| buffer.text()),
2735            "zero\none\ntwo\nthree\nfour\nfive\n"
2736        );
2737        assert_eq!(
2738            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2739            "zero\none\ntwo\nthree\nfour\nfive\n"
2740        );
2741        request.await.unwrap();
2742    }
2743
2744    #[gpui::test]
2745    async fn test_reading_from_line(cx: &mut TestAppContext) {
2746        init_test(cx);
2747
2748        let fs = FakeFs::new(cx.executor());
2749        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2750            .await;
2751        let project = Project::test(fs.clone(), [], cx).await;
2752        project
2753            .update(cx, |project, cx| {
2754                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2755            })
2756            .await
2757            .unwrap();
2758
2759        let connection = Rc::new(FakeAgentConnection::new());
2760
2761        let thread = cx
2762            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2763            .await
2764            .unwrap();
2765
2766        // Whole file
2767        let content = thread
2768            .update(cx, |thread, cx| {
2769                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2770            })
2771            .await
2772            .unwrap();
2773
2774        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2775
2776        // Only start line
2777        let content = thread
2778            .update(cx, |thread, cx| {
2779                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2780            })
2781            .await
2782            .unwrap();
2783
2784        assert_eq!(content, "three\nfour\n");
2785
2786        // Only limit
2787        let content = thread
2788            .update(cx, |thread, cx| {
2789                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2790            })
2791            .await
2792            .unwrap();
2793
2794        assert_eq!(content, "one\ntwo\n");
2795
2796        // Range
2797        let content = thread
2798            .update(cx, |thread, cx| {
2799                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2800            })
2801            .await
2802            .unwrap();
2803
2804        assert_eq!(content, "two\nthree\n");
2805
2806        // Invalid
2807        let err = thread
2808            .update(cx, |thread, cx| {
2809                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2810            })
2811            .await
2812            .unwrap_err();
2813
2814        assert_eq!(
2815            err.to_string(),
2816            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2817        );
2818    }
2819
2820    #[gpui::test]
2821    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2822        init_test(cx);
2823
2824        let fs = FakeFs::new(cx.executor());
2825        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2826        let project = Project::test(fs.clone(), [], cx).await;
2827        project
2828            .update(cx, |project, cx| {
2829                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2830            })
2831            .await
2832            .unwrap();
2833
2834        let connection = Rc::new(FakeAgentConnection::new());
2835
2836        let thread = cx
2837            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2838            .await
2839            .unwrap();
2840
2841        // Whole file
2842        let content = thread
2843            .update(cx, |thread, cx| {
2844                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2845            })
2846            .await
2847            .unwrap();
2848
2849        assert_eq!(content, "");
2850
2851        // Only start line
2852        let content = thread
2853            .update(cx, |thread, cx| {
2854                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2855            })
2856            .await
2857            .unwrap();
2858
2859        assert_eq!(content, "");
2860
2861        // Only limit
2862        let content = thread
2863            .update(cx, |thread, cx| {
2864                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2865            })
2866            .await
2867            .unwrap();
2868
2869        assert_eq!(content, "");
2870
2871        // Range
2872        let content = thread
2873            .update(cx, |thread, cx| {
2874                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2875            })
2876            .await
2877            .unwrap();
2878
2879        assert_eq!(content, "");
2880
2881        // Invalid
2882        let err = thread
2883            .update(cx, |thread, cx| {
2884                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2885            })
2886            .await
2887            .unwrap_err();
2888
2889        assert_eq!(
2890            err.to_string(),
2891            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2892        );
2893    }
2894    #[gpui::test]
2895    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2896        init_test(cx);
2897
2898        let fs = FakeFs::new(cx.executor());
2899        fs.insert_tree(path!("/tmp"), json!({})).await;
2900        let project = Project::test(fs.clone(), [], cx).await;
2901        project
2902            .update(cx, |project, cx| {
2903                project.find_or_create_worktree(path!("/tmp"), true, cx)
2904            })
2905            .await
2906            .unwrap();
2907
2908        let connection = Rc::new(FakeAgentConnection::new());
2909
2910        let thread = cx
2911            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2912            .await
2913            .unwrap();
2914
2915        // Out of project file
2916        let err = thread
2917            .update(cx, |thread, cx| {
2918                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2919            })
2920            .await
2921            .unwrap_err();
2922
2923        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2924    }
2925
2926    #[gpui::test]
2927    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2928        init_test(cx);
2929
2930        let fs = FakeFs::new(cx.executor());
2931        let project = Project::test(fs, [], cx).await;
2932        let id = acp::ToolCallId("test".into());
2933
2934        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2935            let id = id.clone();
2936            move |_, thread, mut cx| {
2937                let id = id.clone();
2938                async move {
2939                    thread
2940                        .update(&mut cx, |thread, cx| {
2941                            thread.handle_session_update(
2942                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2943                                    id: id.clone(),
2944                                    title: "Label".into(),
2945                                    kind: acp::ToolKind::Fetch,
2946                                    status: acp::ToolCallStatus::InProgress,
2947                                    content: vec![],
2948                                    locations: vec![],
2949                                    raw_input: None,
2950                                    raw_output: None,
2951                                    meta: None,
2952                                }),
2953                                cx,
2954                            )
2955                        })
2956                        .unwrap()
2957                        .unwrap();
2958                    Ok(acp::PromptResponse {
2959                        stop_reason: acp::StopReason::EndTurn,
2960                        meta: None,
2961                    })
2962                }
2963                .boxed_local()
2964            }
2965        }));
2966
2967        let thread = cx
2968            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2969            .await
2970            .unwrap();
2971
2972        let request = thread.update(cx, |thread, cx| {
2973            thread.send_raw("Fetch https://example.com", cx)
2974        });
2975
2976        run_until_first_tool_call(&thread, cx).await;
2977
2978        thread.read_with(cx, |thread, _| {
2979            assert!(matches!(
2980                thread.entries[1],
2981                AgentThreadEntry::ToolCall(ToolCall {
2982                    status: ToolCallStatus::InProgress,
2983                    ..
2984                })
2985            ));
2986        });
2987
2988        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2989
2990        thread.read_with(cx, |thread, _| {
2991            assert!(matches!(
2992                &thread.entries[1],
2993                AgentThreadEntry::ToolCall(ToolCall {
2994                    status: ToolCallStatus::Canceled,
2995                    ..
2996                })
2997            ));
2998        });
2999
3000        thread
3001            .update(cx, |thread, cx| {
3002                thread.handle_session_update(
3003                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3004                        id,
3005                        fields: acp::ToolCallUpdateFields {
3006                            status: Some(acp::ToolCallStatus::Completed),
3007                            ..Default::default()
3008                        },
3009                        meta: None,
3010                    }),
3011                    cx,
3012                )
3013            })
3014            .unwrap();
3015
3016        request.await.unwrap();
3017
3018        thread.read_with(cx, |thread, _| {
3019            assert!(matches!(
3020                thread.entries[1],
3021                AgentThreadEntry::ToolCall(ToolCall {
3022                    status: ToolCallStatus::Completed,
3023                    ..
3024                })
3025            ));
3026        });
3027    }
3028
3029    #[gpui::test]
3030    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3031        init_test(cx);
3032        let fs = FakeFs::new(cx.background_executor.clone());
3033        fs.insert_tree(path!("/test"), json!({})).await;
3034        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3035
3036        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3037            move |_, thread, mut cx| {
3038                async move {
3039                    thread
3040                        .update(&mut cx, |thread, cx| {
3041                            thread.handle_session_update(
3042                                acp::SessionUpdate::ToolCall(acp::ToolCall {
3043                                    id: acp::ToolCallId("test".into()),
3044                                    title: "Label".into(),
3045                                    kind: acp::ToolKind::Edit,
3046                                    status: acp::ToolCallStatus::Completed,
3047                                    content: vec![acp::ToolCallContent::Diff {
3048                                        diff: acp::Diff {
3049                                            path: "/test/test.txt".into(),
3050                                            old_text: None,
3051                                            new_text: "foo".into(),
3052                                            meta: None,
3053                                        },
3054                                    }],
3055                                    locations: vec![],
3056                                    raw_input: None,
3057                                    raw_output: None,
3058                                    meta: None,
3059                                }),
3060                                cx,
3061                            )
3062                        })
3063                        .unwrap()
3064                        .unwrap();
3065                    Ok(acp::PromptResponse {
3066                        stop_reason: acp::StopReason::EndTurn,
3067                        meta: None,
3068                    })
3069                }
3070                .boxed_local()
3071            }
3072        }));
3073
3074        let thread = cx
3075            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3076            .await
3077            .unwrap();
3078
3079        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3080            .await
3081            .unwrap();
3082
3083        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3084    }
3085
3086    #[gpui::test(iterations = 10)]
3087    async fn test_checkpoints(cx: &mut TestAppContext) {
3088        init_test(cx);
3089        let fs = FakeFs::new(cx.background_executor.clone());
3090        fs.insert_tree(
3091            path!("/test"),
3092            json!({
3093                ".git": {}
3094            }),
3095        )
3096        .await;
3097        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3098
3099        let simulate_changes = Arc::new(AtomicBool::new(true));
3100        let next_filename = Arc::new(AtomicUsize::new(0));
3101        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3102            let simulate_changes = simulate_changes.clone();
3103            let next_filename = next_filename.clone();
3104            let fs = fs.clone();
3105            move |request, thread, mut cx| {
3106                let fs = fs.clone();
3107                let simulate_changes = simulate_changes.clone();
3108                let next_filename = next_filename.clone();
3109                async move {
3110                    if simulate_changes.load(SeqCst) {
3111                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3112                        fs.write(Path::new(&filename), b"").await?;
3113                    }
3114
3115                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3116                        panic!("expected text content block");
3117                    };
3118                    thread.update(&mut cx, |thread, cx| {
3119                        thread
3120                            .handle_session_update(
3121                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3122                                    content: content.text.to_uppercase().into(),
3123                                    meta: None,
3124                                }),
3125                                cx,
3126                            )
3127                            .unwrap();
3128                    })?;
3129                    Ok(acp::PromptResponse {
3130                        stop_reason: acp::StopReason::EndTurn,
3131                        meta: None,
3132                    })
3133                }
3134                .boxed_local()
3135            }
3136        }));
3137        let thread = cx
3138            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3139            .await
3140            .unwrap();
3141
3142        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3143            .await
3144            .unwrap();
3145        thread.read_with(cx, |thread, cx| {
3146            assert_eq!(
3147                thread.to_markdown(cx),
3148                indoc! {"
3149                    ## User (checkpoint)
3150
3151                    Lorem
3152
3153                    ## Assistant
3154
3155                    LOREM
3156
3157                "}
3158            );
3159        });
3160        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3161
3162        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3163            .await
3164            .unwrap();
3165        thread.read_with(cx, |thread, cx| {
3166            assert_eq!(
3167                thread.to_markdown(cx),
3168                indoc! {"
3169                    ## User (checkpoint)
3170
3171                    Lorem
3172
3173                    ## Assistant
3174
3175                    LOREM
3176
3177                    ## User (checkpoint)
3178
3179                    ipsum
3180
3181                    ## Assistant
3182
3183                    IPSUM
3184
3185                "}
3186            );
3187        });
3188        assert_eq!(
3189            fs.files(),
3190            vec![
3191                Path::new(path!("/test/file-0")),
3192                Path::new(path!("/test/file-1"))
3193            ]
3194        );
3195
3196        // Checkpoint isn't stored when there are no changes.
3197        simulate_changes.store(false, SeqCst);
3198        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3199            .await
3200            .unwrap();
3201        thread.read_with(cx, |thread, cx| {
3202            assert_eq!(
3203                thread.to_markdown(cx),
3204                indoc! {"
3205                    ## User (checkpoint)
3206
3207                    Lorem
3208
3209                    ## Assistant
3210
3211                    LOREM
3212
3213                    ## User (checkpoint)
3214
3215                    ipsum
3216
3217                    ## Assistant
3218
3219                    IPSUM
3220
3221                    ## User
3222
3223                    dolor
3224
3225                    ## Assistant
3226
3227                    DOLOR
3228
3229                "}
3230            );
3231        });
3232        assert_eq!(
3233            fs.files(),
3234            vec![
3235                Path::new(path!("/test/file-0")),
3236                Path::new(path!("/test/file-1"))
3237            ]
3238        );
3239
3240        // Rewinding the conversation truncates the history and restores the checkpoint.
3241        thread
3242            .update(cx, |thread, cx| {
3243                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3244                    panic!("unexpected entries {:?}", thread.entries)
3245                };
3246                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3247            })
3248            .await
3249            .unwrap();
3250        thread.read_with(cx, |thread, cx| {
3251            assert_eq!(
3252                thread.to_markdown(cx),
3253                indoc! {"
3254                    ## User (checkpoint)
3255
3256                    Lorem
3257
3258                    ## Assistant
3259
3260                    LOREM
3261
3262                "}
3263            );
3264        });
3265        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3266    }
3267
3268    #[gpui::test]
3269    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3270        use std::sync::atomic::AtomicUsize;
3271        init_test(cx);
3272
3273        let fs = FakeFs::new(cx.executor());
3274        let project = Project::test(fs, None, cx).await;
3275
3276        // Create a connection that simulates refusal after tool result
3277        let prompt_count = Arc::new(AtomicUsize::new(0));
3278        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3279            let prompt_count = prompt_count.clone();
3280            move |_request, thread, mut cx| {
3281                let count = prompt_count.fetch_add(1, SeqCst);
3282                async move {
3283                    if count == 0 {
3284                        // First prompt: Generate a tool call with result
3285                        thread.update(&mut cx, |thread, cx| {
3286                            thread
3287                                .handle_session_update(
3288                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3289                                        id: acp::ToolCallId("tool1".into()),
3290                                        title: "Test Tool".into(),
3291                                        kind: acp::ToolKind::Fetch,
3292                                        status: acp::ToolCallStatus::Completed,
3293                                        content: vec![],
3294                                        locations: vec![],
3295                                        raw_input: Some(serde_json::json!({"query": "test"})),
3296                                        raw_output: Some(
3297                                            serde_json::json!({"result": "inappropriate content"}),
3298                                        ),
3299                                        meta: None,
3300                                    }),
3301                                    cx,
3302                                )
3303                                .unwrap();
3304                        })?;
3305
3306                        // Now return refusal because of the tool result
3307                        Ok(acp::PromptResponse {
3308                            stop_reason: acp::StopReason::Refusal,
3309                            meta: None,
3310                        })
3311                    } else {
3312                        Ok(acp::PromptResponse {
3313                            stop_reason: acp::StopReason::EndTurn,
3314                            meta: None,
3315                        })
3316                    }
3317                }
3318                .boxed_local()
3319            }
3320        }));
3321
3322        let thread = cx
3323            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3324            .await
3325            .unwrap();
3326
3327        // Track if we see a Refusal event
3328        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3329        let saw_refusal_event_captured = saw_refusal_event.clone();
3330        thread.update(cx, |_thread, cx| {
3331            cx.subscribe(
3332                &thread,
3333                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3334                    if matches!(event, AcpThreadEvent::Refusal) {
3335                        *saw_refusal_event_captured.lock().unwrap() = true;
3336                    }
3337                },
3338            )
3339            .detach();
3340        });
3341
3342        // Send a user message - this will trigger tool call and then refusal
3343        let send_task = thread.update(cx, |thread, cx| {
3344            thread.send(
3345                vec![acp::ContentBlock::Text(acp::TextContent {
3346                    text: "Hello".into(),
3347                    annotations: None,
3348                    meta: None,
3349                })],
3350                cx,
3351            )
3352        });
3353        cx.background_executor.spawn(send_task).detach();
3354        cx.run_until_parked();
3355
3356        // Verify that:
3357        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3358        // 2. The user message was NOT truncated
3359        assert!(
3360            *saw_refusal_event.lock().unwrap(),
3361            "Refusal event should be emitted for tool result refusals"
3362        );
3363
3364        thread.read_with(cx, |thread, _| {
3365            let entries = thread.entries();
3366            assert!(entries.len() >= 2, "Should have user message and tool call");
3367
3368            // Verify user message is still there
3369            assert!(
3370                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3371                "User message should not be truncated"
3372            );
3373
3374            // Verify tool call is there with result
3375            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3376                assert!(
3377                    tool_call.raw_output.is_some(),
3378                    "Tool call should have output"
3379                );
3380            } else {
3381                panic!("Expected tool call at index 1");
3382            }
3383        });
3384    }
3385
3386    #[gpui::test]
3387    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3388        init_test(cx);
3389
3390        let fs = FakeFs::new(cx.executor());
3391        let project = Project::test(fs, None, cx).await;
3392
3393        let refuse_next = Arc::new(AtomicBool::new(false));
3394        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3395            let refuse_next = refuse_next.clone();
3396            move |_request, _thread, _cx| {
3397                if refuse_next.load(SeqCst) {
3398                    async move {
3399                        Ok(acp::PromptResponse {
3400                            stop_reason: acp::StopReason::Refusal,
3401                            meta: None,
3402                        })
3403                    }
3404                    .boxed_local()
3405                } else {
3406                    async move {
3407                        Ok(acp::PromptResponse {
3408                            stop_reason: acp::StopReason::EndTurn,
3409                            meta: None,
3410                        })
3411                    }
3412                    .boxed_local()
3413                }
3414            }
3415        }));
3416
3417        let thread = cx
3418            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3419            .await
3420            .unwrap();
3421
3422        // Track if we see a Refusal event
3423        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3424        let saw_refusal_event_captured = saw_refusal_event.clone();
3425        thread.update(cx, |_thread, cx| {
3426            cx.subscribe(
3427                &thread,
3428                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3429                    if matches!(event, AcpThreadEvent::Refusal) {
3430                        *saw_refusal_event_captured.lock().unwrap() = true;
3431                    }
3432                },
3433            )
3434            .detach();
3435        });
3436
3437        // Send a message that will be refused
3438        refuse_next.store(true, SeqCst);
3439        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3440            .await
3441            .unwrap();
3442
3443        // Verify that a Refusal event WAS emitted for user prompt refusal
3444        assert!(
3445            *saw_refusal_event.lock().unwrap(),
3446            "Refusal event should be emitted for user prompt refusals"
3447        );
3448
3449        // Verify the message was truncated (user prompt refusal)
3450        thread.read_with(cx, |thread, cx| {
3451            assert_eq!(thread.to_markdown(cx), "");
3452        });
3453    }
3454
3455    #[gpui::test]
3456    async fn test_refusal(cx: &mut TestAppContext) {
3457        init_test(cx);
3458        let fs = FakeFs::new(cx.background_executor.clone());
3459        fs.insert_tree(path!("/"), json!({})).await;
3460        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3461
3462        let refuse_next = Arc::new(AtomicBool::new(false));
3463        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3464            let refuse_next = refuse_next.clone();
3465            move |request, thread, mut cx| {
3466                let refuse_next = refuse_next.clone();
3467                async move {
3468                    if refuse_next.load(SeqCst) {
3469                        return Ok(acp::PromptResponse {
3470                            stop_reason: acp::StopReason::Refusal,
3471                            meta: None,
3472                        });
3473                    }
3474
3475                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3476                        panic!("expected text content block");
3477                    };
3478                    thread.update(&mut cx, |thread, cx| {
3479                        thread
3480                            .handle_session_update(
3481                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3482                                    content: content.text.to_uppercase().into(),
3483                                    meta: None,
3484                                }),
3485                                cx,
3486                            )
3487                            .unwrap();
3488                    })?;
3489                    Ok(acp::PromptResponse {
3490                        stop_reason: acp::StopReason::EndTurn,
3491                        meta: None,
3492                    })
3493                }
3494                .boxed_local()
3495            }
3496        }));
3497        let thread = cx
3498            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3499            .await
3500            .unwrap();
3501
3502        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3503            .await
3504            .unwrap();
3505        thread.read_with(cx, |thread, cx| {
3506            assert_eq!(
3507                thread.to_markdown(cx),
3508                indoc! {"
3509                    ## User
3510
3511                    hello
3512
3513                    ## Assistant
3514
3515                    HELLO
3516
3517                "}
3518            );
3519        });
3520
3521        // Simulate refusing the second message. The message should be truncated
3522        // when a user prompt is refused.
3523        refuse_next.store(true, SeqCst);
3524        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3525            .await
3526            .unwrap();
3527        thread.read_with(cx, |thread, cx| {
3528            assert_eq!(
3529                thread.to_markdown(cx),
3530                indoc! {"
3531                    ## User
3532
3533                    hello
3534
3535                    ## Assistant
3536
3537                    HELLO
3538
3539                "}
3540            );
3541        });
3542    }
3543
3544    async fn run_until_first_tool_call(
3545        thread: &Entity<AcpThread>,
3546        cx: &mut TestAppContext,
3547    ) -> usize {
3548        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3549
3550        let subscription = cx.update(|cx| {
3551            cx.subscribe(thread, move |thread, _, cx| {
3552                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3553                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3554                        return tx.try_send(ix).unwrap();
3555                    }
3556                }
3557            })
3558        });
3559
3560        select! {
3561            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3562                panic!("Timeout waiting for tool call")
3563            }
3564            ix = rx.next().fuse() => {
3565                drop(subscription);
3566                ix.unwrap()
3567            }
3568        }
3569    }
3570
3571    #[derive(Clone, Default)]
3572    struct FakeAgentConnection {
3573        auth_methods: Vec<acp::AuthMethod>,
3574        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3575        on_user_message: Option<
3576            Rc<
3577                dyn Fn(
3578                        acp::PromptRequest,
3579                        WeakEntity<AcpThread>,
3580                        AsyncApp,
3581                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3582                    + 'static,
3583            >,
3584        >,
3585    }
3586
3587    impl FakeAgentConnection {
3588        fn new() -> Self {
3589            Self {
3590                auth_methods: Vec::new(),
3591                on_user_message: None,
3592                sessions: Arc::default(),
3593            }
3594        }
3595
3596        #[expect(unused)]
3597        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3598            self.auth_methods = auth_methods;
3599            self
3600        }
3601
3602        fn on_user_message(
3603            mut self,
3604            handler: impl Fn(
3605                acp::PromptRequest,
3606                WeakEntity<AcpThread>,
3607                AsyncApp,
3608            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3609            + 'static,
3610        ) -> Self {
3611            self.on_user_message.replace(Rc::new(handler));
3612            self
3613        }
3614    }
3615
3616    impl AgentConnection for FakeAgentConnection {
3617        fn auth_methods(&self) -> &[acp::AuthMethod] {
3618            &self.auth_methods
3619        }
3620
3621        fn new_thread(
3622            self: Rc<Self>,
3623            project: Entity<Project>,
3624            _cwd: &Path,
3625            cx: &mut App,
3626        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3627            let session_id = acp::SessionId(
3628                rand::rng()
3629                    .sample_iter(&distr::Alphanumeric)
3630                    .take(7)
3631                    .map(char::from)
3632                    .collect::<String>()
3633                    .into(),
3634            );
3635            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3636            let thread = cx.new(|cx| {
3637                AcpThread::new(
3638                    "Test",
3639                    self.clone(),
3640                    project,
3641                    action_log,
3642                    session_id.clone(),
3643                    watch::Receiver::constant(acp::PromptCapabilities {
3644                        image: true,
3645                        audio: true,
3646                        embedded_context: true,
3647                        meta: None,
3648                    }),
3649                    cx,
3650                )
3651            });
3652            self.sessions.lock().insert(session_id, thread.downgrade());
3653            Task::ready(Ok(thread))
3654        }
3655
3656        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3657            if self.auth_methods().iter().any(|m| m.id == method) {
3658                Task::ready(Ok(()))
3659            } else {
3660                Task::ready(Err(anyhow!("Invalid Auth Method")))
3661            }
3662        }
3663
3664        fn prompt(
3665            &self,
3666            _id: Option<UserMessageId>,
3667            params: acp::PromptRequest,
3668            cx: &mut App,
3669        ) -> Task<gpui::Result<acp::PromptResponse>> {
3670            let sessions = self.sessions.lock();
3671            let thread = sessions.get(&params.session_id).unwrap();
3672            if let Some(handler) = &self.on_user_message {
3673                let handler = handler.clone();
3674                let thread = thread.clone();
3675                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3676            } else {
3677                Task::ready(Ok(acp::PromptResponse {
3678                    stop_reason: acp::StopReason::EndTurn,
3679                    meta: None,
3680                }))
3681            }
3682        }
3683
3684        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3685            let sessions = self.sessions.lock();
3686            let thread = sessions.get(session_id).unwrap().clone();
3687
3688            cx.spawn(async move |cx| {
3689                thread
3690                    .update(cx, |thread, cx| thread.cancel(cx))
3691                    .unwrap()
3692                    .await
3693            })
3694            .detach();
3695        }
3696
3697        fn truncate(
3698            &self,
3699            session_id: &acp::SessionId,
3700            _cx: &App,
3701        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3702            Some(Rc::new(FakeAgentSessionEditor {
3703                _session_id: session_id.clone(),
3704            }))
3705        }
3706
3707        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3708            self
3709        }
3710    }
3711
3712    struct FakeAgentSessionEditor {
3713        _session_id: acp::SessionId,
3714    }
3715
3716    impl AgentSessionTruncate for FakeAgentSessionEditor {
3717        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3718            Task::ready(Ok(()))
3719        }
3720    }
3721
3722    #[gpui::test]
3723    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3724        init_test(cx);
3725
3726        let fs = FakeFs::new(cx.executor());
3727        let project = Project::test(fs, [], cx).await;
3728        let connection = Rc::new(FakeAgentConnection::new());
3729        let thread = cx
3730            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3731            .await
3732            .unwrap();
3733
3734        // Try to update a tool call that doesn't exist
3735        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3736        thread.update(cx, |thread, cx| {
3737            let result = thread.handle_session_update(
3738                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3739                    id: nonexistent_id.clone(),
3740                    fields: acp::ToolCallUpdateFields {
3741                        status: Some(acp::ToolCallStatus::Completed),
3742                        ..Default::default()
3743                    },
3744                    meta: None,
3745                }),
3746                cx,
3747            );
3748
3749            // The update should succeed (not return an error)
3750            assert!(result.is_ok());
3751
3752            // There should now be exactly one entry in the thread
3753            assert_eq!(thread.entries.len(), 1);
3754
3755            // The entry should be a failed tool call
3756            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3757                assert_eq!(tool_call.id, nonexistent_id);
3758                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3759                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3760
3761                // Check that the content contains the error message
3762                assert_eq!(tool_call.content.len(), 1);
3763                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3764                    match content_block {
3765                        ContentBlock::Markdown { markdown } => {
3766                            let markdown_text = markdown.read(cx).source();
3767                            assert!(markdown_text.contains("Tool call not found"));
3768                        }
3769                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3770                        ContentBlock::ResourceLink { .. } => {
3771                            panic!("Expected markdown content, got resource link")
3772                        }
3773                    }
3774                } else {
3775                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3776                }
3777            } else {
3778                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3779            }
3780        });
3781    }
3782}