acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use agent_settings::AgentSettings;
   7use collections::HashSet;
   8pub use connection::*;
   9pub use diff::*;
  10use language::language_settings::FormatOnSave;
  11pub use mention::*;
  12use project::lsp_store::{FormatTrigger, LspFormatTarget};
  13use serde::{Deserialize, Serialize};
  14use settings::Settings as _;
  15use task::{Shell, ShellBuilder};
  16pub use terminal::*;
  17
  18use action_log::ActionLog;
  19use agent_client_protocol::{self as acp};
  20use anyhow::{Context as _, Result, anyhow};
  21use editor::Bias;
  22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  24use itertools::Itertools;
  25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  26use markdown::Markdown;
  27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  28use std::collections::HashMap;
  29use std::error::Error;
  30use std::fmt::{Formatter, Write};
  31use std::ops::Range;
  32use std::process::ExitStatus;
  33use std::rc::Rc;
  34use std::time::{Duration, Instant};
  35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  36use ui::App;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40#[derive(Debug)]
  41pub struct UserMessage<T> {
  42    pub id: Option<UserMessageId>,
  43    pub content: ContentBlock,
  44    pub chunks: Vec<acp::ContentBlock<T>>,
  45    pub checkpoint: Option<Checkpoint>,
  46}
  47
  48#[derive(Debug)]
  49pub struct Checkpoint {
  50    git_checkpoint: GitStoreCheckpoint,
  51    pub show: bool,
  52}
  53
  54impl<T> UserMessage<T> {
  55    fn to_markdown(&self, cx: &App) -> String {
  56        let mut markdown = String::new();
  57        if self
  58            .checkpoint
  59            .as_ref()
  60            .is_some_and(|checkpoint| checkpoint.show)
  61        {
  62            writeln!(markdown, "## User (checkpoint)").unwrap();
  63        } else {
  64            writeln!(markdown, "## User").unwrap();
  65        }
  66        writeln!(markdown).unwrap();
  67        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  68        writeln!(markdown).unwrap();
  69        markdown
  70    }
  71}
  72
  73#[derive(Debug, PartialEq)]
  74pub struct AssistantMessage {
  75    pub chunks: Vec<AssistantMessageChunk>,
  76}
  77
  78impl AssistantMessage {
  79    pub fn to_markdown(&self, cx: &App) -> String {
  80        format!(
  81            "## Assistant\n\n{}\n\n",
  82            self.chunks
  83                .iter()
  84                .map(|chunk| chunk.to_markdown(cx))
  85                .join("\n\n")
  86        )
  87    }
  88}
  89
  90#[derive(Debug, PartialEq)]
  91pub enum AssistantMessageChunk {
  92    Message { block: ContentBlock },
  93    Thought { block: ContentBlock },
  94}
  95
  96impl AssistantMessageChunk {
  97    pub fn from_str(
  98        chunk: &str,
  99        language_registry: &Arc<LanguageRegistry>,
 100        path_style: PathStyle,
 101        cx: &mut App,
 102    ) -> Self {
 103        Self::Message {
 104            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 105        }
 106    }
 107
 108    fn to_markdown(&self, cx: &App) -> String {
 109        match self {
 110            Self::Message { block } => block.to_markdown(cx).to_string(),
 111            Self::Thought { block } => {
 112                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 113            }
 114        }
 115    }
 116}
 117
 118#[derive(Debug)]
 119pub enum AgentThreadEntry<T> {
 120    UserMessage(UserMessage<T>),
 121    AssistantMessage(AssistantMessage),
 122    ToolCall(ToolCall),
 123}
 124
 125impl<T> AgentThreadEntry<T> {
 126    pub fn to_markdown(&self, cx: &App) -> String {
 127        match self {
 128            Self::UserMessage(message) => message.to_markdown(cx),
 129            Self::AssistantMessage(message) => message.to_markdown(cx),
 130            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 131        }
 132    }
 133
 134    pub fn user_message(&self) -> Option<&UserMessage<T>> {
 135        if let AgentThreadEntry::UserMessage(message) = self {
 136            Some(message)
 137        } else {
 138            None
 139        }
 140    }
 141
 142    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 143        if let AgentThreadEntry::ToolCall(call) = self {
 144            itertools::Either::Left(call.diffs())
 145        } else {
 146            itertools::Either::Right(std::iter::empty())
 147        }
 148    }
 149
 150    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 151        if let AgentThreadEntry::ToolCall(call) = self {
 152            itertools::Either::Left(call.terminals())
 153        } else {
 154            itertools::Either::Right(std::iter::empty())
 155        }
 156    }
 157
 158    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 159        if let AgentThreadEntry::ToolCall(ToolCall {
 160            locations,
 161            resolved_locations,
 162            ..
 163        }) = self
 164        {
 165            Some((
 166                locations.get(ix)?.clone(),
 167                resolved_locations.get(ix)?.clone()?,
 168            ))
 169        } else {
 170            None
 171        }
 172    }
 173}
 174
 175#[derive(Debug)]
 176pub struct ToolCall {
 177    pub id: acp::ToolCallId,
 178    pub label: Entity<Markdown>,
 179    pub kind: acp::ToolKind,
 180    pub content: Vec<ToolCallContent>,
 181    pub status: ToolCallStatus,
 182    pub locations: Vec<acp::ToolCallLocation>,
 183    pub resolved_locations: Vec<Option<AgentLocation>>,
 184    pub raw_input: Option<serde_json::Value>,
 185    pub raw_output: Option<serde_json::Value>,
 186}
 187
 188impl ToolCall {
 189    fn from_acp(
 190        tool_call: acp::ToolCall,
 191        status: ToolCallStatus,
 192        language_registry: Arc<LanguageRegistry>,
 193        path_style: PathStyle,
 194        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 195        cx: &mut App,
 196    ) -> Result<Self> {
 197        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 198            first_line.to_owned() + ""
 199        } else {
 200            tool_call.title
 201        };
 202        let mut content = Vec::with_capacity(tool_call.content.len());
 203        for item in tool_call.content {
 204            content.push(ToolCallContent::from_acp(
 205                item,
 206                language_registry.clone(),
 207                path_style,
 208                terminals,
 209                cx,
 210            )?);
 211        }
 212
 213        let result = Self {
 214            id: tool_call.id,
 215            label: cx
 216                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 217            kind: tool_call.kind,
 218            content,
 219            locations: tool_call.locations,
 220            resolved_locations: Vec::default(),
 221            status,
 222            raw_input: tool_call.raw_input,
 223            raw_output: tool_call.raw_output,
 224        };
 225        Ok(result)
 226    }
 227
 228    fn update_fields(
 229        &mut self,
 230        fields: acp::ToolCallUpdateFields,
 231        language_registry: Arc<LanguageRegistry>,
 232        path_style: PathStyle,
 233        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 234        cx: &mut App,
 235    ) -> Result<()> {
 236        let acp::ToolCallUpdateFields {
 237            kind,
 238            status,
 239            title,
 240            content,
 241            locations,
 242            raw_input,
 243            raw_output,
 244        } = fields;
 245
 246        if let Some(kind) = kind {
 247            self.kind = kind;
 248        }
 249
 250        if let Some(status) = status {
 251            self.status = status.into();
 252        }
 253
 254        if let Some(title) = title {
 255            self.label.update(cx, |label, cx| {
 256                if let Some((first_line, _)) = title.split_once("\n") {
 257                    label.replace(first_line.to_owned() + "", cx)
 258                } else {
 259                    label.replace(title, cx);
 260                }
 261            });
 262        }
 263
 264        if let Some(content) = content {
 265            let new_content_len = content.len();
 266            let mut content = content.into_iter();
 267
 268            // Reuse existing content if we can
 269            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 270                old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 271            }
 272            for new in content {
 273                self.content.push(ToolCallContent::from_acp(
 274                    new,
 275                    language_registry.clone(),
 276                    path_style,
 277                    terminals,
 278                    cx,
 279                )?)
 280            }
 281            self.content.truncate(new_content_len);
 282        }
 283
 284        if let Some(locations) = locations {
 285            self.locations = locations;
 286        }
 287
 288        if let Some(raw_input) = raw_input {
 289            self.raw_input = Some(raw_input);
 290        }
 291
 292        if let Some(raw_output) = raw_output {
 293            if self.content.is_empty()
 294                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 295            {
 296                self.content
 297                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 298                        markdown,
 299                    }));
 300            }
 301            self.raw_output = Some(raw_output);
 302        }
 303        Ok(())
 304    }
 305
 306    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 307        self.content.iter().filter_map(|content| match content {
 308            ToolCallContent::Diff(diff) => Some(diff),
 309            ToolCallContent::ContentBlock(_) => None,
 310            ToolCallContent::Terminal(_) => None,
 311        })
 312    }
 313
 314    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 315        self.content.iter().filter_map(|content| match content {
 316            ToolCallContent::Terminal(terminal) => Some(terminal),
 317            ToolCallContent::ContentBlock(_) => None,
 318            ToolCallContent::Diff(_) => None,
 319        })
 320    }
 321
 322    fn to_markdown(&self, cx: &App) -> String {
 323        let mut markdown = format!(
 324            "**Tool Call: {}**\nStatus: {}\n\n",
 325            self.label.read(cx).source(),
 326            self.status
 327        );
 328        for content in &self.content {
 329            markdown.push_str(content.to_markdown(cx).as_str());
 330            markdown.push_str("\n\n");
 331        }
 332        markdown
 333    }
 334
 335    async fn resolve_location(
 336        location: acp::ToolCallLocation,
 337        project: WeakEntity<Project>,
 338        cx: &mut AsyncApp,
 339    ) -> Option<ResolvedLocation> {
 340        let buffer = project
 341            .update(cx, |project, cx| {
 342                project
 343                    .project_path_for_absolute_path(&location.path, cx)
 344                    .map(|path| project.open_buffer(path, cx))
 345            })
 346            .ok()??;
 347        let buffer = buffer.await.log_err()?;
 348        let position = buffer
 349            .update(cx, |buffer, _| {
 350                if let Some(row) = location.line {
 351                    let snapshot = buffer.snapshot();
 352                    let column = snapshot.indent_size_for_line(row).len;
 353                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 354                    snapshot.anchor_before(point)
 355                } else {
 356                    Anchor::MIN
 357                }
 358            })
 359            .ok()?;
 360
 361        Some(ResolvedLocation { buffer, position })
 362    }
 363
 364    fn resolve_locations(
 365        &self,
 366        project: Entity<Project>,
 367        cx: &mut App,
 368    ) -> Task<Vec<Option<ResolvedLocation>>> {
 369        let locations = self.locations.clone();
 370        project.update(cx, |_, cx| {
 371            cx.spawn(async move |project, cx| {
 372                let mut new_locations = Vec::new();
 373                for location in locations {
 374                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 375                }
 376                new_locations
 377            })
 378        })
 379    }
 380}
 381
 382// Separate so we can hold a strong reference to the buffer
 383// for saving on the thread
 384#[derive(Clone, Debug, PartialEq, Eq)]
 385struct ResolvedLocation {
 386    buffer: Entity<Buffer>,
 387    position: Anchor,
 388}
 389
 390impl From<&ResolvedLocation> for AgentLocation {
 391    fn from(value: &ResolvedLocation) -> Self {
 392        Self {
 393            buffer: value.buffer.downgrade(),
 394            position: value.position,
 395        }
 396    }
 397}
 398
 399#[derive(Debug)]
 400pub enum ToolCallStatus {
 401    /// The tool call hasn't started running yet, but we start showing it to
 402    /// the user.
 403    Pending,
 404    /// The tool call is waiting for confirmation from the user.
 405    WaitingForConfirmation {
 406        options: Vec<acp::PermissionOption>,
 407        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 408    },
 409    /// The tool call is currently running.
 410    InProgress,
 411    /// The tool call completed successfully.
 412    Completed,
 413    /// The tool call failed.
 414    Failed,
 415    /// The user rejected the tool call.
 416    Rejected,
 417    /// The user canceled generation so the tool call was canceled.
 418    Canceled,
 419}
 420
 421impl From<acp::ToolCallStatus> for ToolCallStatus {
 422    fn from(status: acp::ToolCallStatus) -> Self {
 423        match status {
 424            acp::ToolCallStatus::Pending => Self::Pending,
 425            acp::ToolCallStatus::InProgress => Self::InProgress,
 426            acp::ToolCallStatus::Completed => Self::Completed,
 427            acp::ToolCallStatus::Failed => Self::Failed,
 428        }
 429    }
 430}
 431
 432impl Display for ToolCallStatus {
 433    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 434        write!(
 435            f,
 436            "{}",
 437            match self {
 438                ToolCallStatus::Pending => "Pending",
 439                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 440                ToolCallStatus::InProgress => "In Progress",
 441                ToolCallStatus::Completed => "Completed",
 442                ToolCallStatus::Failed => "Failed",
 443                ToolCallStatus::Rejected => "Rejected",
 444                ToolCallStatus::Canceled => "Canceled",
 445            }
 446        )
 447    }
 448}
 449
 450#[derive(Debug, PartialEq, Clone)]
 451pub enum ContentBlock {
 452    Empty,
 453    Markdown { markdown: Entity<Markdown> },
 454    ResourceLink { resource_link: acp::ResourceLink },
 455}
 456
 457impl ContentBlock {
 458    pub fn new(
 459        block: acp::ContentBlock,
 460        language_registry: &Arc<LanguageRegistry>,
 461        path_style: PathStyle,
 462        cx: &mut App,
 463    ) -> Self {
 464        let mut this = Self::Empty;
 465        this.append(block, language_registry, path_style, cx);
 466        this
 467    }
 468
 469    pub fn new_combined(
 470        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 471        language_registry: Arc<LanguageRegistry>,
 472        path_style: PathStyle,
 473        cx: &mut App,
 474    ) -> Self {
 475        let mut this = Self::Empty;
 476        for block in blocks {
 477            this.append(block, &language_registry, path_style, cx);
 478        }
 479        this
 480    }
 481
 482    pub fn append(
 483        &mut self,
 484        block: acp::ContentBlock,
 485        language_registry: &Arc<LanguageRegistry>,
 486        path_style: PathStyle,
 487        cx: &mut App,
 488    ) {
 489        if matches!(self, ContentBlock::Empty)
 490            && let acp::ContentBlock::ResourceLink(resource_link) = block
 491        {
 492            *self = ContentBlock::ResourceLink { resource_link };
 493            return;
 494        }
 495
 496        let new_content = self.block_string_contents(block, path_style);
 497
 498        match self {
 499            ContentBlock::Empty => {
 500                *self = Self::create_markdown_block(new_content, language_registry, cx);
 501            }
 502            ContentBlock::Markdown { markdown } => {
 503                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 504            }
 505            ContentBlock::ResourceLink { resource_link } => {
 506                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 507                let combined = format!("{}\n{}", existing_content, new_content);
 508
 509                *self = Self::create_markdown_block(combined, language_registry, cx);
 510            }
 511        }
 512    }
 513
 514    fn create_markdown_block(
 515        content: String,
 516        language_registry: &Arc<LanguageRegistry>,
 517        cx: &mut App,
 518    ) -> ContentBlock {
 519        ContentBlock::Markdown {
 520            markdown: cx
 521                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 522        }
 523    }
 524
 525    fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
 526        match block {
 527            acp::ContentBlock::Text(text_content) => text_content.text,
 528            acp::ContentBlock::ResourceLink(resource_link) => {
 529                Self::resource_link_md(&resource_link.uri, path_style)
 530            }
 531            acp::ContentBlock::Resource(acp::EmbeddedResource {
 532                resource:
 533                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 534                        uri,
 535                        ..
 536                    }),
 537                ..
 538            }) => Self::resource_link_md(&uri, path_style),
 539            acp::ContentBlock::Image(image) => Self::image_md(&image),
 540            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 541        }
 542    }
 543
 544    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 545        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 546            uri.as_link().to_string()
 547        } else {
 548            uri.to_string()
 549        }
 550    }
 551
 552    fn image_md(_image: &acp::ImageContent) -> String {
 553        "`Image`".into()
 554    }
 555
 556    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 557        match self {
 558            ContentBlock::Empty => "",
 559            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 560            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 561        }
 562    }
 563
 564    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 565        match self {
 566            ContentBlock::Empty => None,
 567            ContentBlock::Markdown { markdown } => Some(markdown),
 568            ContentBlock::ResourceLink { .. } => None,
 569        }
 570    }
 571
 572    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 573        match self {
 574            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 575            _ => None,
 576        }
 577    }
 578}
 579
 580#[derive(Debug)]
 581pub enum ToolCallContent {
 582    ContentBlock(ContentBlock),
 583    Diff(Entity<Diff>),
 584    Terminal(Entity<Terminal>),
 585}
 586
 587impl ToolCallContent {
 588    pub fn from_acp(
 589        content: acp::ToolCallContent,
 590        language_registry: Arc<LanguageRegistry>,
 591        path_style: PathStyle,
 592        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 593        cx: &mut App,
 594    ) -> Result<Self> {
 595        match content {
 596            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 597                content,
 598                &language_registry,
 599                path_style,
 600                cx,
 601            ))),
 602            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 603                Diff::finalized(
 604                    diff.path.to_string_lossy().into_owned(),
 605                    diff.old_text,
 606                    diff.new_text,
 607                    language_registry,
 608                    cx,
 609                )
 610            }))),
 611            acp::ToolCallContent::Terminal { terminal_id } => terminals
 612                .get(&terminal_id)
 613                .cloned()
 614                .map(Self::Terminal)
 615                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 616        }
 617    }
 618
 619    pub fn update_from_acp(
 620        &mut self,
 621        new: acp::ToolCallContent,
 622        language_registry: Arc<LanguageRegistry>,
 623        path_style: PathStyle,
 624        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 625        cx: &mut App,
 626    ) -> Result<()> {
 627        let needs_update = match (&self, &new) {
 628            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 629                old_diff.read(cx).needs_update(
 630                    new_diff.old_text.as_deref().unwrap_or(""),
 631                    &new_diff.new_text,
 632                    cx,
 633                )
 634            }
 635            _ => true,
 636        };
 637
 638        if needs_update {
 639            *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
 640        }
 641        Ok(())
 642    }
 643
 644    pub fn to_markdown(&self, cx: &App) -> String {
 645        match self {
 646            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 647            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 648            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 649        }
 650    }
 651}
 652
 653#[derive(Debug, PartialEq)]
 654pub enum ToolCallUpdate {
 655    UpdateFields(acp::ToolCallUpdate),
 656    UpdateDiff(ToolCallUpdateDiff),
 657    UpdateTerminal(ToolCallUpdateTerminal),
 658}
 659
 660impl ToolCallUpdate {
 661    fn id(&self) -> &acp::ToolCallId {
 662        match self {
 663            Self::UpdateFields(update) => &update.id,
 664            Self::UpdateDiff(diff) => &diff.id,
 665            Self::UpdateTerminal(terminal) => &terminal.id,
 666        }
 667    }
 668}
 669
 670impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 671    fn from(update: acp::ToolCallUpdate) -> Self {
 672        Self::UpdateFields(update)
 673    }
 674}
 675
 676impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 677    fn from(diff: ToolCallUpdateDiff) -> Self {
 678        Self::UpdateDiff(diff)
 679    }
 680}
 681
 682#[derive(Debug, PartialEq)]
 683pub struct ToolCallUpdateDiff {
 684    pub id: acp::ToolCallId,
 685    pub diff: Entity<Diff>,
 686}
 687
 688impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 689    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 690        Self::UpdateTerminal(terminal)
 691    }
 692}
 693
 694#[derive(Debug, PartialEq)]
 695pub struct ToolCallUpdateTerminal {
 696    pub id: acp::ToolCallId,
 697    pub terminal: Entity<Terminal>,
 698}
 699
 700#[derive(Debug, Default)]
 701pub struct Plan {
 702    pub entries: Vec<PlanEntry>,
 703}
 704
 705#[derive(Debug)]
 706pub struct PlanStats<'a> {
 707    pub in_progress_entry: Option<&'a PlanEntry>,
 708    pub pending: u32,
 709    pub completed: u32,
 710}
 711
 712impl Plan {
 713    pub fn is_empty(&self) -> bool {
 714        self.entries.is_empty()
 715    }
 716
 717    pub fn stats(&self) -> PlanStats<'_> {
 718        let mut stats = PlanStats {
 719            in_progress_entry: None,
 720            pending: 0,
 721            completed: 0,
 722        };
 723
 724        for entry in &self.entries {
 725            match &entry.status {
 726                acp::PlanEntryStatus::Pending => {
 727                    stats.pending += 1;
 728                }
 729                acp::PlanEntryStatus::InProgress => {
 730                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 731                }
 732                acp::PlanEntryStatus::Completed => {
 733                    stats.completed += 1;
 734                }
 735            }
 736        }
 737
 738        stats
 739    }
 740}
 741
 742#[derive(Debug)]
 743pub struct PlanEntry {
 744    pub content: Entity<Markdown>,
 745    pub priority: acp::PlanEntryPriority,
 746    pub status: acp::PlanEntryStatus,
 747}
 748
 749impl PlanEntry {
 750    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 751        Self {
 752            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 753            priority: entry.priority,
 754            status: entry.status,
 755        }
 756    }
 757}
 758
 759#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 760pub struct TokenUsage {
 761    pub max_tokens: u64,
 762    pub used_tokens: u64,
 763}
 764
 765impl TokenUsage {
 766    pub fn ratio(&self) -> TokenUsageRatio {
 767        #[cfg(debug_assertions)]
 768        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 769            .unwrap_or("0.8".to_string())
 770            .parse()
 771            .unwrap();
 772        #[cfg(not(debug_assertions))]
 773        let warning_threshold: f32 = 0.8;
 774
 775        // When the maximum is unknown because there is no selected model,
 776        // avoid showing the token limit warning.
 777        if self.max_tokens == 0 {
 778            TokenUsageRatio::Normal
 779        } else if self.used_tokens >= self.max_tokens {
 780            TokenUsageRatio::Exceeded
 781        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 782            TokenUsageRatio::Warning
 783        } else {
 784            TokenUsageRatio::Normal
 785        }
 786    }
 787}
 788
 789#[derive(Debug, Clone, PartialEq, Eq)]
 790pub enum TokenUsageRatio {
 791    Normal,
 792    Warning,
 793    Exceeded,
 794}
 795
 796#[derive(Debug, Clone)]
 797pub struct RetryStatus {
 798    pub last_error: SharedString,
 799    pub attempt: usize,
 800    pub max_attempts: usize,
 801    pub started_at: Instant,
 802    pub duration: Duration,
 803}
 804
 805pub struct AnchoredText;
 806
 807pub struct AcpThread<T = SharedString> {
 808    title: T,
 809    entries: Vec<AgentThreadEntry<AnchoredText>>,
 810    plan: Plan,
 811    project: Entity<Project>,
 812    action_log: Entity<ActionLog>,
 813    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 814    send_task: Option<Task<()>>,
 815    connection: Rc<dyn AgentConnection>,
 816    session_id: acp::SessionId,
 817    token_usage: Option<TokenUsage>,
 818    prompt_capabilities: acp::PromptCapabilities,
 819    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 820    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 821    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 822    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 823}
 824
 825#[derive(Debug)]
 826pub enum AcpThreadEvent {
 827    NewEntry,
 828    TitleUpdated,
 829    TokenUsageUpdated,
 830    EntryUpdated(usize),
 831    EntriesRemoved(Range<usize>),
 832    ToolAuthorizationRequired,
 833    Retry(RetryStatus),
 834    Stopped,
 835    Error,
 836    LoadError(LoadError),
 837    PromptCapabilitiesUpdated,
 838    Refusal,
 839    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 840    ModeUpdated(acp::SessionModeId),
 841}
 842
 843impl EventEmitter<AcpThreadEvent> for AcpThread {}
 844
 845#[derive(Debug, Clone)]
 846pub enum TerminalProviderEvent {
 847    Created {
 848        terminal_id: acp::TerminalId,
 849        label: String,
 850        cwd: Option<PathBuf>,
 851        output_byte_limit: Option<u64>,
 852        terminal: Entity<::terminal::Terminal>,
 853    },
 854    Output {
 855        terminal_id: acp::TerminalId,
 856        data: Vec<u8>,
 857    },
 858    TitleChanged {
 859        terminal_id: acp::TerminalId,
 860        title: String,
 861    },
 862    Exit {
 863        terminal_id: acp::TerminalId,
 864        status: acp::TerminalExitStatus,
 865    },
 866}
 867
 868#[derive(Debug, Clone)]
 869pub enum TerminalProviderCommand {
 870    WriteInput {
 871        terminal_id: acp::TerminalId,
 872        bytes: Vec<u8>,
 873    },
 874    Resize {
 875        terminal_id: acp::TerminalId,
 876        cols: u16,
 877        rows: u16,
 878    },
 879    Close {
 880        terminal_id: acp::TerminalId,
 881    },
 882}
 883
 884impl AcpThread {
 885    pub fn on_terminal_provider_event(
 886        &mut self,
 887        event: TerminalProviderEvent,
 888        cx: &mut Context<Self>,
 889    ) {
 890        match event {
 891            TerminalProviderEvent::Created {
 892                terminal_id,
 893                label,
 894                cwd,
 895                output_byte_limit,
 896                terminal,
 897            } => {
 898                let entity = self.register_terminal_created(
 899                    terminal_id.clone(),
 900                    label,
 901                    cwd,
 902                    output_byte_limit,
 903                    terminal,
 904                    cx,
 905                );
 906
 907                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 908                    for data in chunks.drain(..) {
 909                        entity.update(cx, |term, cx| {
 910                            term.inner().update(cx, |inner, cx| {
 911                                inner.write_output(&data, cx);
 912                            })
 913                        });
 914                    }
 915                }
 916
 917                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 918                    entity.update(cx, |_term, cx| {
 919                        cx.notify();
 920                    });
 921                }
 922
 923                cx.notify();
 924            }
 925            TerminalProviderEvent::Output { terminal_id, data } => {
 926                if let Some(entity) = self.terminals.get(&terminal_id) {
 927                    entity.update(cx, |term, cx| {
 928                        term.inner().update(cx, |inner, cx| {
 929                            inner.write_output(&data, cx);
 930                        })
 931                    });
 932                } else {
 933                    self.pending_terminal_output
 934                        .entry(terminal_id)
 935                        .or_default()
 936                        .push(data);
 937                }
 938            }
 939            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 940                if let Some(entity) = self.terminals.get(&terminal_id) {
 941                    entity.update(cx, |term, cx| {
 942                        term.inner().update(cx, |inner, cx| {
 943                            inner.breadcrumb_text = title;
 944                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 945                        })
 946                    });
 947                }
 948            }
 949            TerminalProviderEvent::Exit {
 950                terminal_id,
 951                status,
 952            } => {
 953                if let Some(entity) = self.terminals.get(&terminal_id) {
 954                    entity.update(cx, |_term, cx| {
 955                        cx.notify();
 956                    });
 957                } else {
 958                    self.pending_terminal_exit.insert(terminal_id, status);
 959                }
 960            }
 961        }
 962    }
 963}
 964
 965#[derive(PartialEq, Eq, Debug)]
 966pub enum ThreadStatus {
 967    Idle,
 968    Generating,
 969}
 970
 971#[derive(Debug, Clone)]
 972pub enum LoadError {
 973    Unsupported {
 974        command: SharedString,
 975        current_version: SharedString,
 976        minimum_version: SharedString,
 977    },
 978    FailedToInstall(SharedString),
 979    Exited {
 980        status: ExitStatus,
 981    },
 982    Other(SharedString),
 983}
 984
 985impl Display for LoadError {
 986    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 987        match self {
 988            LoadError::Unsupported {
 989                command: path,
 990                current_version,
 991                minimum_version,
 992            } => {
 993                write!(
 994                    f,
 995                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
 996                )
 997            }
 998            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
 999            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1000            LoadError::Other(msg) => write!(f, "{msg}"),
1001        }
1002    }
1003}
1004
1005impl Error for LoadError {}
1006
1007impl<T> AcpThread<T> {
1008    pub fn new(
1009        title: impl Into<SharedString>,
1010        connection: Rc<dyn AgentConnection>,
1011        project: Entity<Project>,
1012        action_log: Entity<ActionLog>,
1013        session_id: acp::SessionId,
1014        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1015        cx: &mut Context<Self>,
1016    ) -> Self {
1017        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1018        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1019            loop {
1020                let caps = prompt_capabilities_rx.recv().await?;
1021                this.update(cx, |this, cx| {
1022                    this.prompt_capabilities = caps;
1023                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1024                })?;
1025            }
1026        });
1027
1028        Self {
1029            action_log,
1030            shared_buffers: Default::default(),
1031            entries: Default::default(),
1032            plan: Default::default(),
1033            title: title.into(),
1034            project,
1035            send_task: None,
1036            connection,
1037            session_id,
1038            token_usage: None,
1039            prompt_capabilities,
1040            _observe_prompt_capabilities: task,
1041            terminals: HashMap::default(),
1042            pending_terminal_output: HashMap::default(),
1043            pending_terminal_exit: HashMap::default(),
1044        }
1045    }
1046
1047    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1048        self.prompt_capabilities.clone()
1049    }
1050
1051    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1052        &self.connection
1053    }
1054
1055    pub fn action_log(&self) -> &Entity<ActionLog> {
1056        &self.action_log
1057    }
1058
1059    pub fn project(&self) -> &Entity<Project> {
1060        &self.project
1061    }
1062
1063    pub fn title(&self) -> SharedString {
1064        self.title.clone()
1065    }
1066
1067    pub fn entries(&self) -> &[AgentThreadEntry] {
1068        &self.entries
1069    }
1070
1071    pub fn session_id(&self) -> &acp::SessionId {
1072        &self.session_id
1073    }
1074
1075    pub fn status(&self) -> ThreadStatus {
1076        if self.send_task.is_some() {
1077            ThreadStatus::Generating
1078        } else {
1079            ThreadStatus::Idle
1080        }
1081    }
1082
1083    pub fn token_usage(&self) -> Option<&TokenUsage> {
1084        self.token_usage.as_ref()
1085    }
1086
1087    pub fn has_pending_edit_tool_calls(&self) -> bool {
1088        for entry in self.entries.iter().rev() {
1089            match entry {
1090                AgentThreadEntry::UserMessage(_) => return false,
1091                AgentThreadEntry::ToolCall(
1092                    call @ ToolCall {
1093                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1094                        ..
1095                    },
1096                ) if call.diffs().next().is_some() => {
1097                    return true;
1098                }
1099                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1100            }
1101        }
1102
1103        false
1104    }
1105
1106    pub fn used_tools_since_last_user_message(&self) -> bool {
1107        for entry in self.entries.iter().rev() {
1108            match entry {
1109                AgentThreadEntry::UserMessage(..) => return false,
1110                AgentThreadEntry::AssistantMessage(..) => continue,
1111                AgentThreadEntry::ToolCall(..) => return true,
1112            }
1113        }
1114
1115        false
1116    }
1117
1118    pub fn handle_session_update(
1119        &mut self,
1120        update: acp::SessionUpdate,
1121        cx: &mut Context<Self>,
1122    ) -> Result<(), acp::Error> {
1123        match update {
1124            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1125                self.push_user_content_block(None, content, cx);
1126            }
1127            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1128                self.push_assistant_content_block(content, false, cx);
1129            }
1130            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1131                self.push_assistant_content_block(content, true, cx);
1132            }
1133            acp::SessionUpdate::ToolCall(tool_call) => {
1134                self.upsert_tool_call(tool_call, cx)?;
1135            }
1136            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1137                self.update_tool_call(tool_call_update, cx)?;
1138            }
1139            acp::SessionUpdate::Plan(plan) => {
1140                self.update_plan(plan, cx);
1141            }
1142            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1143                available_commands,
1144                ..
1145            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1146            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1147                current_mode_id,
1148                ..
1149            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1150        }
1151        Ok(())
1152    }
1153
1154    pub fn push_user_content_block(
1155        &mut self,
1156        message_id: Option<UserMessageId>,
1157        chunk: acp::ContentBlock<T>,
1158        cx: &mut Context<Self>,
1159    ) {
1160        let language_registry = self.project.read(cx).languages().clone();
1161        let path_style = self.project.read(cx).path_style(cx);
1162        let entries_len = self.entries.len();
1163
1164        if let Some(last_entry) = self.entries.last_mut()
1165            && let AgentThreadEntry::UserMessage(UserMessage {
1166                id,
1167                content,
1168                chunks,
1169                ..
1170            }) = last_entry
1171        {
1172            *id = message_id.or(id.take());
1173            content.append(chunk.clone(), &language_registry, path_style, cx);
1174            chunks.push(chunk);
1175            let idx = entries_len - 1;
1176            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1177        } else {
1178            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1179            self.push_entry(
1180                AgentThreadEntry::UserMessage(UserMessage {
1181                    id: message_id,
1182                    content,
1183                    chunks: vec![chunk],
1184                    checkpoint: None,
1185                }),
1186                cx,
1187            );
1188        }
1189    }
1190
1191    pub fn push_assistant_content_block(
1192        &mut self,
1193        chunk: acp::ContentBlock,
1194        is_thought: bool,
1195        cx: &mut Context<Self>,
1196    ) {
1197        let language_registry = self.project.read(cx).languages().clone();
1198        let path_style = self.project.read(cx).path_style(cx);
1199        let entries_len = self.entries.len();
1200        if let Some(last_entry) = self.entries.last_mut()
1201            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1202        {
1203            let idx = entries_len - 1;
1204            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1205            match (chunks.last_mut(), is_thought) {
1206                (Some(AssistantMessageChunk::Message { block }), false)
1207                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1208                    block.append(chunk, &language_registry, path_style, cx)
1209                }
1210                _ => {
1211                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1212                    if is_thought {
1213                        chunks.push(AssistantMessageChunk::Thought { block })
1214                    } else {
1215                        chunks.push(AssistantMessageChunk::Message { block })
1216                    }
1217                }
1218            }
1219        } else {
1220            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1221            let chunk = if is_thought {
1222                AssistantMessageChunk::Thought { block }
1223            } else {
1224                AssistantMessageChunk::Message { block }
1225            };
1226
1227            self.push_entry(
1228                AgentThreadEntry::AssistantMessage(AssistantMessage {
1229                    chunks: vec![chunk],
1230                }),
1231                cx,
1232            );
1233        }
1234    }
1235
1236    fn push_entry(&mut self, entry: AgentThreadEntry<T>, cx: &mut Context<Self>) {
1237        self.entries.push(entry);
1238        cx.emit(AcpThreadEvent::NewEntry);
1239    }
1240
1241    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1242        self.connection.set_title(&self.session_id, cx).is_some()
1243    }
1244
1245    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1246        if title != self.title {
1247            self.title = title.clone();
1248            cx.emit(AcpThreadEvent::TitleUpdated);
1249            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1250                return set_title.run(title, cx);
1251            }
1252        }
1253        Task::ready(Ok(()))
1254    }
1255
1256    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1257        self.token_usage = usage;
1258        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1259    }
1260
1261    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1262        cx.emit(AcpThreadEvent::Retry(status));
1263    }
1264
1265    pub fn update_tool_call(
1266        &mut self,
1267        update: impl Into<ToolCallUpdate>,
1268        cx: &mut Context<Self>,
1269    ) -> Result<()> {
1270        let update = update.into();
1271        let languages = self.project.read(cx).languages().clone();
1272        let path_style = self.project.read(cx).path_style(cx);
1273
1274        let ix = match self.index_for_tool_call(update.id()) {
1275            Some(ix) => ix,
1276            None => {
1277                // Tool call not found - create a failed tool call entry
1278                let failed_tool_call = ToolCall {
1279                    id: update.id().clone(),
1280                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1281                    kind: acp::ToolKind::Fetch,
1282                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1283                        acp::ContentBlock::Text(acp::TextContent {
1284                            text: "Tool call not found".to_string(),
1285                            annotations: None,
1286                            meta: None,
1287                        }),
1288                        &languages,
1289                        path_style,
1290                        cx,
1291                    ))],
1292                    status: ToolCallStatus::Failed,
1293                    locations: Vec::new(),
1294                    resolved_locations: Vec::new(),
1295                    raw_input: None,
1296                    raw_output: None,
1297                };
1298                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1299                return Ok(());
1300            }
1301        };
1302        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1303            unreachable!()
1304        };
1305
1306        match update {
1307            ToolCallUpdate::UpdateFields(update) => {
1308                let location_updated = update.fields.locations.is_some();
1309                call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1310                if location_updated {
1311                    self.resolve_locations(update.id, cx);
1312                }
1313            }
1314            ToolCallUpdate::UpdateDiff(update) => {
1315                call.content.clear();
1316                call.content.push(ToolCallContent::Diff(update.diff));
1317            }
1318            ToolCallUpdate::UpdateTerminal(update) => {
1319                call.content.clear();
1320                call.content
1321                    .push(ToolCallContent::Terminal(update.terminal));
1322            }
1323        }
1324
1325        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1326
1327        Ok(())
1328    }
1329
1330    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1331    pub fn upsert_tool_call(
1332        &mut self,
1333        tool_call: acp::ToolCall,
1334        cx: &mut Context<Self>,
1335    ) -> Result<(), acp::Error> {
1336        let status = tool_call.status.into();
1337        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1338    }
1339
1340    /// Fails if id does not match an existing entry.
1341    pub fn upsert_tool_call_inner(
1342        &mut self,
1343        update: acp::ToolCallUpdate,
1344        status: ToolCallStatus,
1345        cx: &mut Context<Self>,
1346    ) -> Result<(), acp::Error> {
1347        let language_registry = self.project.read(cx).languages().clone();
1348        let path_style = self.project.read(cx).path_style(cx);
1349        let id = update.id.clone();
1350
1351        if let Some(ix) = self.index_for_tool_call(&id) {
1352            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1353                unreachable!()
1354            };
1355
1356            call.update_fields(
1357                update.fields,
1358                language_registry,
1359                path_style,
1360                &self.terminals,
1361                cx,
1362            )?;
1363            call.status = status;
1364
1365            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1366        } else {
1367            let call = ToolCall::from_acp(
1368                update.try_into()?,
1369                status,
1370                language_registry,
1371                self.project.read(cx).path_style(cx),
1372                &self.terminals,
1373                cx,
1374            )?;
1375            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1376        };
1377
1378        self.resolve_locations(id, cx);
1379        Ok(())
1380    }
1381
1382    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1383        self.entries
1384            .iter()
1385            .enumerate()
1386            .rev()
1387            .find_map(|(index, entry)| {
1388                if let AgentThreadEntry::ToolCall(tool_call) = entry
1389                    && &tool_call.id == id
1390                {
1391                    Some(index)
1392                } else {
1393                    None
1394                }
1395            })
1396    }
1397
1398    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1399        // The tool call we are looking for is typically the last one, or very close to the end.
1400        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1401        self.entries
1402            .iter_mut()
1403            .enumerate()
1404            .rev()
1405            .find_map(|(index, tool_call)| {
1406                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1407                    && &tool_call.id == id
1408                {
1409                    Some((index, tool_call))
1410                } else {
1411                    None
1412                }
1413            })
1414    }
1415
1416    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1417        self.entries
1418            .iter()
1419            .enumerate()
1420            .rev()
1421            .find_map(|(index, tool_call)| {
1422                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1423                    && &tool_call.id == id
1424                {
1425                    Some((index, tool_call))
1426                } else {
1427                    None
1428                }
1429            })
1430    }
1431
1432    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1433        let project = self.project.clone();
1434        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1435            return;
1436        };
1437        let task = tool_call.resolve_locations(project, cx);
1438        cx.spawn(async move |this, cx| {
1439            let resolved_locations = task.await;
1440
1441            this.update(cx, |this, cx| {
1442                let project = this.project.clone();
1443
1444                for location in resolved_locations.iter().flatten() {
1445                    this.shared_buffers
1446                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1447                }
1448                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1449                    return;
1450                };
1451
1452                if let Some(Some(location)) = resolved_locations.last() {
1453                    project.update(cx, |project, cx| {
1454                        let should_ignore = if let Some(agent_location) = project
1455                            .agent_location()
1456                            .filter(|agent_location| agent_location.buffer == location.buffer)
1457                        {
1458                            let snapshot = location.buffer.read(cx).snapshot();
1459                            let old_position = agent_location.position.to_point(&snapshot);
1460                            let new_position = location.position.to_point(&snapshot);
1461
1462                            // ignore this so that when we get updates from the edit tool
1463                            // the position doesn't reset to the startof line
1464                            old_position.row == new_position.row
1465                                && old_position.column > new_position.column
1466                        } else {
1467                            false
1468                        };
1469                        if !should_ignore {
1470                            project.set_agent_location(Some(location.into()), cx);
1471                        }
1472                    });
1473                }
1474
1475                let resolved_locations = resolved_locations
1476                    .iter()
1477                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1478                    .collect::<Vec<_>>();
1479
1480                if tool_call.resolved_locations != resolved_locations {
1481                    tool_call.resolved_locations = resolved_locations;
1482                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1483                }
1484            })
1485        })
1486        .detach();
1487    }
1488
1489    pub fn request_tool_call_authorization(
1490        &mut self,
1491        tool_call: acp::ToolCallUpdate,
1492        options: Vec<acp::PermissionOption>,
1493        respect_always_allow_setting: bool,
1494        cx: &mut Context<Self>,
1495    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1496        let (tx, rx) = oneshot::channel();
1497
1498        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1499            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1500            // some tools would (incorrectly) continue to auto-accept.
1501            if let Some(allow_once_option) = options.iter().find_map(|option| {
1502                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1503                    Some(option.id.clone())
1504                } else {
1505                    None
1506                }
1507            }) {
1508                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1509                return Ok(async {
1510                    acp::RequestPermissionOutcome::Selected {
1511                        option_id: allow_once_option,
1512                    }
1513                }
1514                .boxed());
1515            }
1516        }
1517
1518        let status = ToolCallStatus::WaitingForConfirmation {
1519            options,
1520            respond_tx: tx,
1521        };
1522
1523        self.upsert_tool_call_inner(tool_call, status, cx)?;
1524        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1525
1526        let fut = async {
1527            match rx.await {
1528                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1529                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1530            }
1531        }
1532        .boxed();
1533
1534        Ok(fut)
1535    }
1536
1537    pub fn authorize_tool_call(
1538        &mut self,
1539        id: acp::ToolCallId,
1540        option_id: acp::PermissionOptionId,
1541        option_kind: acp::PermissionOptionKind,
1542        cx: &mut Context<Self>,
1543    ) {
1544        let Some((ix, call)) = self.tool_call_mut(&id) else {
1545            return;
1546        };
1547
1548        let new_status = match option_kind {
1549            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1550                ToolCallStatus::Rejected
1551            }
1552            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1553                ToolCallStatus::InProgress
1554            }
1555        };
1556
1557        let curr_status = mem::replace(&mut call.status, new_status);
1558
1559        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1560            respond_tx.send(option_id).log_err();
1561        } else if cfg!(debug_assertions) {
1562            panic!("tried to authorize an already authorized tool call");
1563        }
1564
1565        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1566    }
1567
1568    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1569        let mut first_tool_call = None;
1570
1571        for entry in self.entries.iter().rev() {
1572            match &entry {
1573                AgentThreadEntry::ToolCall(call) => {
1574                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1575                        first_tool_call = Some(call);
1576                    } else {
1577                        continue;
1578                    }
1579                }
1580                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1581                    // Reached the beginning of the turn.
1582                    // If we had pending permission requests in the previous turn, they have been cancelled.
1583                    break;
1584                }
1585            }
1586        }
1587
1588        first_tool_call
1589    }
1590
1591    pub fn plan(&self) -> &Plan {
1592        &self.plan
1593    }
1594
1595    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1596        let new_entries_len = request.entries.len();
1597        let mut new_entries = request.entries.into_iter();
1598
1599        // Reuse existing markdown to prevent flickering
1600        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1601            let PlanEntry {
1602                content,
1603                priority,
1604                status,
1605            } = old;
1606            content.update(cx, |old, cx| {
1607                old.replace(new.content, cx);
1608            });
1609            *priority = new.priority;
1610            *status = new.status;
1611        }
1612        for new in new_entries {
1613            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1614        }
1615        self.plan.entries.truncate(new_entries_len);
1616
1617        cx.notify();
1618    }
1619
1620    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1621        self.plan
1622            .entries
1623            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1624        cx.notify();
1625    }
1626
1627    #[cfg(any(test, feature = "test-support"))]
1628    pub fn send_raw(
1629        &mut self,
1630        message: &str,
1631        cx: &mut Context<Self>,
1632    ) -> BoxFuture<'static, Result<()>> {
1633        self.send(
1634            vec![acp::ContentBlock::Text(acp::TextContent {
1635                text: message.to_string(),
1636                annotations: None,
1637                meta: None,
1638            })],
1639            cx,
1640        )
1641    }
1642
1643    pub fn send(
1644        &mut self,
1645        message: Vec<acp::ContentBlock>,
1646        cx: &mut Context<Self>,
1647    ) -> BoxFuture<'static, Result<()>> {
1648        let block = ContentBlock::new_combined(
1649            message.clone(),
1650            self.project.read(cx).languages().clone(),
1651            self.project.read(cx).path_style(cx),
1652            cx,
1653        );
1654        let request = acp::PromptRequest {
1655            prompt: message.clone(),
1656            session_id: self.session_id.clone(),
1657            meta: None,
1658        };
1659        let git_store = self.project.read(cx).git_store().clone();
1660
1661        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1662            Some(UserMessageId::new())
1663        } else {
1664            None
1665        };
1666
1667        self.run_turn(cx, async move |this, cx| {
1668            this.update(cx, |this, cx| {
1669                this.push_entry(
1670                    AgentThreadEntry::UserMessage(UserMessage {
1671                        id: message_id.clone(),
1672                        content: block,
1673                        chunks: message,
1674                        checkpoint: None,
1675                    }),
1676                    cx,
1677                );
1678            })
1679            .ok();
1680
1681            let old_checkpoint = git_store
1682                .update(cx, |git, cx| git.checkpoint(cx))?
1683                .await
1684                .context("failed to get old checkpoint")
1685                .log_err();
1686            this.update(cx, |this, cx| {
1687                if let Some((_ix, message)) = this.last_user_message() {
1688                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1689                        git_checkpoint,
1690                        show: false,
1691                    });
1692                }
1693                this.connection.prompt(message_id, request, cx)
1694            })?
1695            .await
1696        })
1697    }
1698
1699    pub fn can_resume(&self, cx: &App) -> bool {
1700        self.connection.resume(&self.session_id, cx).is_some()
1701    }
1702
1703    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1704        self.run_turn(cx, async move |this, cx| {
1705            this.update(cx, |this, cx| {
1706                this.connection
1707                    .resume(&this.session_id, cx)
1708                    .map(|resume| resume.run(cx))
1709            })?
1710            .context("resuming a session is not supported")?
1711            .await
1712        })
1713    }
1714
1715    fn run_turn(
1716        &mut self,
1717        cx: &mut Context<Self>,
1718        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1719    ) -> BoxFuture<'static, Result<()>> {
1720        self.clear_completed_plan_entries(cx);
1721
1722        let (tx, rx) = oneshot::channel();
1723        let cancel_task = self.cancel(cx);
1724
1725        self.send_task = Some(cx.spawn(async move |this, cx| {
1726            cancel_task.await;
1727            tx.send(f(this, cx).await).ok();
1728        }));
1729
1730        cx.spawn(async move |this, cx| {
1731            let response = rx.await;
1732
1733            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1734                .await?;
1735
1736            this.update(cx, |this, cx| {
1737                this.project
1738                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1739                match response {
1740                    Ok(Err(e)) => {
1741                        this.send_task.take();
1742                        cx.emit(AcpThreadEvent::Error);
1743                        Err(e)
1744                    }
1745                    result => {
1746                        let canceled = matches!(
1747                            result,
1748                            Ok(Ok(acp::PromptResponse {
1749                                stop_reason: acp::StopReason::Cancelled,
1750                                meta: None,
1751                            }))
1752                        );
1753
1754                        // We only take the task if the current prompt wasn't canceled.
1755                        //
1756                        // This prompt may have been canceled because another one was sent
1757                        // while it was still generating. In these cases, dropping `send_task`
1758                        // would cause the next generation to be canceled.
1759                        if !canceled {
1760                            this.send_task.take();
1761                        }
1762
1763                        // Handle refusal - distinguish between user prompt and tool call refusals
1764                        if let Ok(Ok(acp::PromptResponse {
1765                            stop_reason: acp::StopReason::Refusal,
1766                            meta: _,
1767                        })) = result
1768                        {
1769                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1770                                // Check if there's a completed tool call with results after the last user message
1771                                // This indicates the refusal is in response to tool output, not the user's prompt
1772                                let has_completed_tool_call_after_user_msg =
1773                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1774                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1775                                            // Check if the tool call has completed and has output
1776                                            matches!(tool_call.status, ToolCallStatus::Completed)
1777                                                && tool_call.raw_output.is_some()
1778                                        } else {
1779                                            false
1780                                        }
1781                                    });
1782
1783                                if has_completed_tool_call_after_user_msg {
1784                                    // Refusal is due to tool output - don't truncate, just notify
1785                                    // The model refused based on what the tool returned
1786                                    cx.emit(AcpThreadEvent::Refusal);
1787                                } else {
1788                                    // User prompt was refused - truncate back to before the user message
1789                                    let range = user_msg_ix..this.entries.len();
1790                                    if range.start < range.end {
1791                                        this.entries.truncate(user_msg_ix);
1792                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1793                                    }
1794                                    cx.emit(AcpThreadEvent::Refusal);
1795                                }
1796                            } else {
1797                                // No user message found, treat as general refusal
1798                                cx.emit(AcpThreadEvent::Refusal);
1799                            }
1800                        }
1801
1802                        cx.emit(AcpThreadEvent::Stopped);
1803                        Ok(())
1804                    }
1805                }
1806            })?
1807        })
1808        .boxed()
1809    }
1810
1811    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1812        let Some(send_task) = self.send_task.take() else {
1813            return Task::ready(());
1814        };
1815
1816        for entry in self.entries.iter_mut() {
1817            if let AgentThreadEntry::ToolCall(call) = entry {
1818                let cancel = matches!(
1819                    call.status,
1820                    ToolCallStatus::Pending
1821                        | ToolCallStatus::WaitingForConfirmation { .. }
1822                        | ToolCallStatus::InProgress
1823                );
1824
1825                if cancel {
1826                    call.status = ToolCallStatus::Canceled;
1827                }
1828            }
1829        }
1830
1831        self.connection.cancel(&self.session_id, cx);
1832
1833        // Wait for the send task to complete
1834        cx.foreground_executor().spawn(send_task)
1835    }
1836
1837    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1838    pub fn restore_checkpoint(
1839        &mut self,
1840        id: UserMessageId,
1841        cx: &mut Context<Self>,
1842    ) -> Task<Result<()>> {
1843        let Some((_, message)) = self.user_message_mut(&id) else {
1844            return Task::ready(Err(anyhow!("message not found")));
1845        };
1846
1847        let checkpoint = message
1848            .checkpoint
1849            .as_ref()
1850            .map(|c| c.git_checkpoint.clone());
1851        let rewind = self.rewind(id.clone(), cx);
1852        let git_store = self.project.read(cx).git_store().clone();
1853
1854        cx.spawn(async move |_, cx| {
1855            rewind.await?;
1856            if let Some(checkpoint) = checkpoint {
1857                git_store
1858                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1859                    .await?;
1860            }
1861
1862            Ok(())
1863        })
1864    }
1865
1866    /// Rewinds this thread to before the entry at `index`, removing it and all
1867    /// subsequent entries while rejecting any action_log changes made from that point.
1868    /// Unlike `restore_checkpoint`, this method does not restore from git.
1869    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1870        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1871            return Task::ready(Err(anyhow!("not supported")));
1872        };
1873
1874        cx.spawn(async move |this, cx| {
1875            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1876            this.update(cx, |this, cx| {
1877                if let Some((ix, _)) = this.user_message_mut(&id) {
1878                    let range = ix..this.entries.len();
1879                    this.entries.truncate(ix);
1880                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1881                }
1882                this.action_log()
1883                    .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1884            })?
1885            .await;
1886            Ok(())
1887        })
1888    }
1889
1890    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1891        let git_store = self.project.read(cx).git_store().clone();
1892
1893        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1894            if let Some(checkpoint) = message.checkpoint.as_ref() {
1895                checkpoint.git_checkpoint.clone()
1896            } else {
1897                return Task::ready(Ok(()));
1898            }
1899        } else {
1900            return Task::ready(Ok(()));
1901        };
1902
1903        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1904        cx.spawn(async move |this, cx| {
1905            let new_checkpoint = new_checkpoint
1906                .await
1907                .context("failed to get new checkpoint")
1908                .log_err();
1909            if let Some(new_checkpoint) = new_checkpoint {
1910                let equal = git_store
1911                    .update(cx, |git, cx| {
1912                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1913                    })?
1914                    .await
1915                    .unwrap_or(true);
1916                this.update(cx, |this, cx| {
1917                    let (ix, message) = this.last_user_message().context("no user message")?;
1918                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1919                    checkpoint.show = !equal;
1920                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1921                    anyhow::Ok(())
1922                })??;
1923            }
1924
1925            Ok(())
1926        })
1927    }
1928
1929    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage<T>)> {
1930        self.entries
1931            .iter_mut()
1932            .enumerate()
1933            .rev()
1934            .find_map(|(ix, entry)| {
1935                if let AgentThreadEntry::UserMessage(message) = entry {
1936                    Some((ix, message))
1937                } else {
1938                    None
1939                }
1940            })
1941    }
1942
1943    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1944        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1945            if let AgentThreadEntry::UserMessage(message) = entry {
1946                if message.id.as_ref() == Some(id) {
1947                    Some((ix, message))
1948                } else {
1949                    None
1950                }
1951            } else {
1952                None
1953            }
1954        })
1955    }
1956
1957    pub fn read_text_file(
1958        &self,
1959        path: PathBuf,
1960        line: Option<u32>,
1961        limit: Option<u32>,
1962        reuse_shared_snapshot: bool,
1963        cx: &mut Context<Self>,
1964    ) -> Task<Result<String, acp::Error>> {
1965        // Args are 1-based, move to 0-based
1966        let line = line.unwrap_or_default().saturating_sub(1);
1967        let limit = limit.unwrap_or(u32::MAX);
1968        let project = self.project.clone();
1969        let action_log = self.action_log.clone();
1970        cx.spawn(async move |this, cx| {
1971            let load = project
1972                .update(cx, |project, cx| {
1973                    let path = project
1974                        .project_path_for_absolute_path(&path, cx)
1975                        .ok_or_else(|| {
1976                            acp::Error::resource_not_found(Some(path.display().to_string()))
1977                        })?;
1978                    Ok(project.open_buffer(path, cx))
1979                })
1980                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1981                .flatten()?;
1982
1983            let buffer = load.await?;
1984
1985            let snapshot = if reuse_shared_snapshot {
1986                this.read_with(cx, |this, _| {
1987                    this.shared_buffers.get(&buffer.clone()).cloned()
1988                })
1989                .log_err()
1990                .flatten()
1991            } else {
1992                None
1993            };
1994
1995            let snapshot = if let Some(snapshot) = snapshot {
1996                snapshot
1997            } else {
1998                action_log.update(cx, |action_log, cx| {
1999                    action_log.buffer_read(buffer.clone(), cx);
2000                })?;
2001
2002                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2003                this.update(cx, |this, _| {
2004                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2005                })?;
2006                snapshot
2007            };
2008
2009            let max_point = snapshot.max_point();
2010            let start_position = Point::new(line, 0);
2011
2012            if start_position > max_point {
2013                return Err(acp::Error::invalid_params().with_data(format!(
2014                    "Attempting to read beyond the end of the file, line {}:{}",
2015                    max_point.row + 1,
2016                    max_point.column
2017                )));
2018            }
2019
2020            let start = snapshot.anchor_before(start_position);
2021            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2022
2023            project.update(cx, |project, cx| {
2024                project.set_agent_location(
2025                    Some(AgentLocation {
2026                        buffer: buffer.downgrade(),
2027                        position: start,
2028                    }),
2029                    cx,
2030                );
2031            })?;
2032
2033            Ok(snapshot.text_for_range(start..end).collect::<String>())
2034        })
2035    }
2036
2037    pub fn write_text_file(
2038        &self,
2039        path: PathBuf,
2040        content: String,
2041        cx: &mut Context<Self>,
2042    ) -> Task<Result<()>> {
2043        let project = self.project.clone();
2044        let action_log = self.action_log.clone();
2045        cx.spawn(async move |this, cx| {
2046            let load = project.update(cx, |project, cx| {
2047                let path = project
2048                    .project_path_for_absolute_path(&path, cx)
2049                    .context("invalid path")?;
2050                anyhow::Ok(project.open_buffer(path, cx))
2051            });
2052            let buffer = load??.await?;
2053            let snapshot = this.update(cx, |this, cx| {
2054                this.shared_buffers
2055                    .get(&buffer)
2056                    .cloned()
2057                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2058            })?;
2059            let edits = cx
2060                .background_executor()
2061                .spawn(async move {
2062                    let old_text = snapshot.text();
2063                    text_diff(old_text.as_str(), &content)
2064                        .into_iter()
2065                        .map(|(range, replacement)| {
2066                            (
2067                                snapshot.anchor_after(range.start)
2068                                    ..snapshot.anchor_before(range.end),
2069                                replacement,
2070                            )
2071                        })
2072                        .collect::<Vec<_>>()
2073                })
2074                .await;
2075
2076            project.update(cx, |project, cx| {
2077                project.set_agent_location(
2078                    Some(AgentLocation {
2079                        buffer: buffer.downgrade(),
2080                        position: edits
2081                            .last()
2082                            .map(|(range, _)| range.end)
2083                            .unwrap_or(Anchor::MIN),
2084                    }),
2085                    cx,
2086                );
2087            })?;
2088
2089            let format_on_save = cx.update(|cx| {
2090                action_log.update(cx, |action_log, cx| {
2091                    action_log.buffer_read(buffer.clone(), cx);
2092                });
2093
2094                let format_on_save = buffer.update(cx, |buffer, cx| {
2095                    buffer.edit(edits, None, cx);
2096
2097                    let settings = language::language_settings::language_settings(
2098                        buffer.language().map(|l| l.name()),
2099                        buffer.file(),
2100                        cx,
2101                    );
2102
2103                    settings.format_on_save != FormatOnSave::Off
2104                });
2105                action_log.update(cx, |action_log, cx| {
2106                    action_log.buffer_edited(buffer.clone(), cx);
2107                });
2108                format_on_save
2109            })?;
2110
2111            if format_on_save {
2112                let format_task = project.update(cx, |project, cx| {
2113                    project.format(
2114                        HashSet::from_iter([buffer.clone()]),
2115                        LspFormatTarget::Buffers,
2116                        false,
2117                        FormatTrigger::Save,
2118                        cx,
2119                    )
2120                })?;
2121                format_task.await.log_err();
2122
2123                action_log.update(cx, |action_log, cx| {
2124                    action_log.buffer_edited(buffer.clone(), cx);
2125                })?;
2126            }
2127
2128            project
2129                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2130                .await
2131        })
2132    }
2133
2134    pub fn create_terminal(
2135        &self,
2136        command: String,
2137        args: Vec<String>,
2138        extra_env: Vec<acp::EnvVariable>,
2139        cwd: Option<PathBuf>,
2140        output_byte_limit: Option<u64>,
2141        cx: &mut Context<Self>,
2142    ) -> Task<Result<Entity<Terminal>>> {
2143        let env = match &cwd {
2144            Some(dir) => self.project.update(cx, |project, cx| {
2145                project.environment().update(cx, |env, cx| {
2146                    env.directory_environment(dir.as_path().into(), cx)
2147                })
2148            }),
2149            None => Task::ready(None).shared(),
2150        };
2151        let env = cx.spawn(async move |_, _| {
2152            let mut env = env.await.unwrap_or_default();
2153            // Disables paging for `git` and hopefully other commands
2154            env.insert("PAGER".into(), "".into());
2155            for var in extra_env {
2156                env.insert(var.name, var.value);
2157            }
2158            env
2159        });
2160
2161        let project = self.project.clone();
2162        let language_registry = project.read(cx).languages().clone();
2163        let is_windows = project.read(cx).path_style(cx).is_windows();
2164
2165        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2166        let terminal_task = cx.spawn({
2167            let terminal_id = terminal_id.clone();
2168            async move |_this, cx| {
2169                let env = env.await;
2170                let shell = project
2171                    .update(cx, |project, cx| {
2172                        project
2173                            .remote_client()
2174                            .and_then(|r| r.read(cx).default_system_shell())
2175                    })?
2176                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2177                let (task_command, task_args) =
2178                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2179                        .redirect_stdin_to_dev_null()
2180                        .build(Some(command.clone()), &args);
2181                let terminal = project
2182                    .update(cx, |project, cx| {
2183                        project.create_terminal_task(
2184                            task::SpawnInTerminal {
2185                                command: Some(task_command),
2186                                args: task_args,
2187                                cwd: cwd.clone(),
2188                                env,
2189                                ..Default::default()
2190                            },
2191                            cx,
2192                        )
2193                    })?
2194                    .await?;
2195
2196                cx.new(|cx| {
2197                    Terminal::new(
2198                        terminal_id,
2199                        &format!("{} {}", command, args.join(" ")),
2200                        cwd,
2201                        output_byte_limit.map(|l| l as usize),
2202                        terminal,
2203                        language_registry,
2204                        cx,
2205                    )
2206                })
2207            }
2208        });
2209
2210        cx.spawn(async move |this, cx| {
2211            let terminal = terminal_task.await?;
2212            this.update(cx, |this, _cx| {
2213                this.terminals.insert(terminal_id, terminal.clone());
2214                terminal
2215            })
2216        })
2217    }
2218
2219    pub fn kill_terminal(
2220        &mut self,
2221        terminal_id: acp::TerminalId,
2222        cx: &mut Context<Self>,
2223    ) -> Result<()> {
2224        self.terminals
2225            .get(&terminal_id)
2226            .context("Terminal not found")?
2227            .update(cx, |terminal, cx| {
2228                terminal.kill(cx);
2229            });
2230
2231        Ok(())
2232    }
2233
2234    pub fn release_terminal(
2235        &mut self,
2236        terminal_id: acp::TerminalId,
2237        cx: &mut Context<Self>,
2238    ) -> Result<()> {
2239        self.terminals
2240            .remove(&terminal_id)
2241            .context("Terminal not found")?
2242            .update(cx, |terminal, cx| {
2243                terminal.kill(cx);
2244            });
2245
2246        Ok(())
2247    }
2248
2249    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2250        self.terminals
2251            .get(&terminal_id)
2252            .context("Terminal not found")
2253            .cloned()
2254    }
2255
2256    pub fn to_markdown(&self, cx: &App) -> String {
2257        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2258    }
2259
2260    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2261        cx.emit(AcpThreadEvent::LoadError(error));
2262    }
2263
2264    pub fn register_terminal_created(
2265        &mut self,
2266        terminal_id: acp::TerminalId,
2267        command_label: String,
2268        working_dir: Option<PathBuf>,
2269        output_byte_limit: Option<u64>,
2270        terminal: Entity<::terminal::Terminal>,
2271        cx: &mut Context<Self>,
2272    ) -> Entity<Terminal> {
2273        let language_registry = self.project.read(cx).languages().clone();
2274
2275        let entity = cx.new(|cx| {
2276            Terminal::new(
2277                terminal_id.clone(),
2278                &command_label,
2279                working_dir.clone(),
2280                output_byte_limit.map(|l| l as usize),
2281                terminal,
2282                language_registry,
2283                cx,
2284            )
2285        });
2286        self.terminals.insert(terminal_id.clone(), entity.clone());
2287        entity
2288    }
2289}
2290
2291fn markdown_for_raw_output(
2292    raw_output: &serde_json::Value,
2293    language_registry: &Arc<LanguageRegistry>,
2294    cx: &mut App,
2295) -> Option<Entity<Markdown>> {
2296    match raw_output {
2297        serde_json::Value::Null => None,
2298        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2299            Markdown::new(
2300                value.to_string().into(),
2301                Some(language_registry.clone()),
2302                None,
2303                cx,
2304            )
2305        })),
2306        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2307            Markdown::new(
2308                value.to_string().into(),
2309                Some(language_registry.clone()),
2310                None,
2311                cx,
2312            )
2313        })),
2314        serde_json::Value::String(value) => Some(cx.new(|cx| {
2315            Markdown::new(
2316                value.clone().into(),
2317                Some(language_registry.clone()),
2318                None,
2319                cx,
2320            )
2321        })),
2322        value => Some(cx.new(|cx| {
2323            Markdown::new(
2324                format!("```json\n{}\n```", value).into(),
2325                Some(language_registry.clone()),
2326                None,
2327                cx,
2328            )
2329        })),
2330    }
2331}
2332
2333#[cfg(test)]
2334mod tests {
2335    use super::*;
2336    use anyhow::anyhow;
2337    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2338    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2339    use indoc::indoc;
2340    use project::{FakeFs, Fs};
2341    use rand::{distr, prelude::*};
2342    use serde_json::json;
2343    use settings::SettingsStore;
2344    use smol::stream::StreamExt as _;
2345    use std::{
2346        any::Any,
2347        cell::RefCell,
2348        path::Path,
2349        rc::Rc,
2350        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2351        time::Duration,
2352    };
2353    use util::path;
2354
2355    fn init_test(cx: &mut TestAppContext) {
2356        env_logger::try_init().ok();
2357        cx.update(|cx| {
2358            let settings_store = SettingsStore::test(cx);
2359            cx.set_global(settings_store);
2360            Project::init_settings(cx);
2361            language::init(cx);
2362        });
2363    }
2364
2365    #[gpui::test]
2366    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2367        init_test(cx);
2368
2369        let fs = FakeFs::new(cx.executor());
2370        let project = Project::test(fs, [], cx).await;
2371        let connection = Rc::new(FakeAgentConnection::new());
2372        let thread = cx
2373            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2374            .await
2375            .unwrap();
2376
2377        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2378
2379        // Send Output BEFORE Created - should be buffered by acp_thread
2380        thread.update(cx, |thread, cx| {
2381            thread.on_terminal_provider_event(
2382                TerminalProviderEvent::Output {
2383                    terminal_id: terminal_id.clone(),
2384                    data: b"hello buffered".to_vec(),
2385                },
2386                cx,
2387            );
2388        });
2389
2390        // Create a display-only terminal and then send Created
2391        let lower = cx.new(|cx| {
2392            let builder = ::terminal::TerminalBuilder::new_display_only(
2393                ::terminal::terminal_settings::CursorShape::default(),
2394                ::terminal::terminal_settings::AlternateScroll::On,
2395                None,
2396                0,
2397            )
2398            .unwrap();
2399            builder.subscribe(cx)
2400        });
2401
2402        thread.update(cx, |thread, cx| {
2403            thread.on_terminal_provider_event(
2404                TerminalProviderEvent::Created {
2405                    terminal_id: terminal_id.clone(),
2406                    label: "Buffered Test".to_string(),
2407                    cwd: None,
2408                    output_byte_limit: None,
2409                    terminal: lower.clone(),
2410                },
2411                cx,
2412            );
2413        });
2414
2415        // After Created, buffered Output should have been flushed into the renderer
2416        let content = thread.read_with(cx, |thread, cx| {
2417            let term = thread.terminal(terminal_id.clone()).unwrap();
2418            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2419        });
2420
2421        assert!(
2422            content.contains("hello buffered"),
2423            "expected buffered output to render, got: {content}"
2424        );
2425    }
2426
2427    #[gpui::test]
2428    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2429        init_test(cx);
2430
2431        let fs = FakeFs::new(cx.executor());
2432        let project = Project::test(fs, [], cx).await;
2433        let connection = Rc::new(FakeAgentConnection::new());
2434        let thread = cx
2435            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2436            .await
2437            .unwrap();
2438
2439        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2440
2441        // Send Output BEFORE Created
2442        thread.update(cx, |thread, cx| {
2443            thread.on_terminal_provider_event(
2444                TerminalProviderEvent::Output {
2445                    terminal_id: terminal_id.clone(),
2446                    data: b"pre-exit data".to_vec(),
2447                },
2448                cx,
2449            );
2450        });
2451
2452        // Send Exit BEFORE Created
2453        thread.update(cx, |thread, cx| {
2454            thread.on_terminal_provider_event(
2455                TerminalProviderEvent::Exit {
2456                    terminal_id: terminal_id.clone(),
2457                    status: acp::TerminalExitStatus {
2458                        exit_code: Some(0),
2459                        signal: None,
2460                        meta: None,
2461                    },
2462                },
2463                cx,
2464            );
2465        });
2466
2467        // Now create a display-only lower-level terminal and send Created
2468        let lower = cx.new(|cx| {
2469            let builder = ::terminal::TerminalBuilder::new_display_only(
2470                ::terminal::terminal_settings::CursorShape::default(),
2471                ::terminal::terminal_settings::AlternateScroll::On,
2472                None,
2473                0,
2474            )
2475            .unwrap();
2476            builder.subscribe(cx)
2477        });
2478
2479        thread.update(cx, |thread, cx| {
2480            thread.on_terminal_provider_event(
2481                TerminalProviderEvent::Created {
2482                    terminal_id: terminal_id.clone(),
2483                    label: "Buffered Exit Test".to_string(),
2484                    cwd: None,
2485                    output_byte_limit: None,
2486                    terminal: lower.clone(),
2487                },
2488                cx,
2489            );
2490        });
2491
2492        // Output should be present after Created (flushed from buffer)
2493        let content = thread.read_with(cx, |thread, cx| {
2494            let term = thread.terminal(terminal_id.clone()).unwrap();
2495            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2496        });
2497
2498        assert!(
2499            content.contains("pre-exit data"),
2500            "expected pre-exit data to render, got: {content}"
2501        );
2502    }
2503
2504    #[gpui::test]
2505    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2506        init_test(cx);
2507
2508        let fs = FakeFs::new(cx.executor());
2509        let project = Project::test(fs, [], cx).await;
2510        let connection = Rc::new(FakeAgentConnection::new());
2511        let thread = cx
2512            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2513            .await
2514            .unwrap();
2515
2516        // Test creating a new user message
2517        thread.update(cx, |thread, cx| {
2518            thread.push_user_content_block(
2519                None,
2520                acp::ContentBlock::Text(acp::TextContent {
2521                    annotations: None,
2522                    text: "Hello, ".to_string(),
2523                    meta: None,
2524                }),
2525                cx,
2526            );
2527        });
2528
2529        thread.update(cx, |thread, cx| {
2530            assert_eq!(thread.entries.len(), 1);
2531            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2532                assert_eq!(user_msg.id, None);
2533                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2534            } else {
2535                panic!("Expected UserMessage");
2536            }
2537        });
2538
2539        // Test appending to existing user message
2540        let message_1_id = UserMessageId::new();
2541        thread.update(cx, |thread, cx| {
2542            thread.push_user_content_block(
2543                Some(message_1_id.clone()),
2544                acp::ContentBlock::Text(acp::TextContent {
2545                    annotations: None,
2546                    text: "world!".to_string(),
2547                    meta: None,
2548                }),
2549                cx,
2550            );
2551        });
2552
2553        thread.update(cx, |thread, cx| {
2554            assert_eq!(thread.entries.len(), 1);
2555            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2556                assert_eq!(user_msg.id, Some(message_1_id));
2557                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2558            } else {
2559                panic!("Expected UserMessage");
2560            }
2561        });
2562
2563        // Test creating new user message after assistant message
2564        thread.update(cx, |thread, cx| {
2565            thread.push_assistant_content_block(
2566                acp::ContentBlock::Text(acp::TextContent {
2567                    annotations: None,
2568                    text: "Assistant response".to_string(),
2569                    meta: None,
2570                }),
2571                false,
2572                cx,
2573            );
2574        });
2575
2576        let message_2_id = UserMessageId::new();
2577        thread.update(cx, |thread, cx| {
2578            thread.push_user_content_block(
2579                Some(message_2_id.clone()),
2580                acp::ContentBlock::Text(acp::TextContent {
2581                    annotations: None,
2582                    text: "New user message".to_string(),
2583                    meta: None,
2584                }),
2585                cx,
2586            );
2587        });
2588
2589        thread.update(cx, |thread, cx| {
2590            assert_eq!(thread.entries.len(), 3);
2591            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2592                assert_eq!(user_msg.id, Some(message_2_id));
2593                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2594            } else {
2595                panic!("Expected UserMessage at index 2");
2596            }
2597        });
2598    }
2599
2600    #[gpui::test]
2601    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2602        init_test(cx);
2603
2604        let fs = FakeFs::new(cx.executor());
2605        let project = Project::test(fs, [], cx).await;
2606        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2607            |_, thread, mut cx| {
2608                async move {
2609                    thread.update(&mut cx, |thread, cx| {
2610                        thread
2611                            .handle_session_update(
2612                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2613                                    content: "Thinking ".into(),
2614                                    meta: None,
2615                                }),
2616                                cx,
2617                            )
2618                            .unwrap();
2619                        thread
2620                            .handle_session_update(
2621                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2622                                    content: "hard!".into(),
2623                                    meta: None,
2624                                }),
2625                                cx,
2626                            )
2627                            .unwrap();
2628                    })?;
2629                    Ok(acp::PromptResponse {
2630                        stop_reason: acp::StopReason::EndTurn,
2631                        meta: None,
2632                    })
2633                }
2634                .boxed_local()
2635            },
2636        ));
2637
2638        let thread = cx
2639            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2640            .await
2641            .unwrap();
2642
2643        thread
2644            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2645            .await
2646            .unwrap();
2647
2648        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2649        assert_eq!(
2650            output,
2651            indoc! {r#"
2652            ## User
2653
2654            Hello from Zed!
2655
2656            ## Assistant
2657
2658            <thinking>
2659            Thinking hard!
2660            </thinking>
2661
2662            "#}
2663        );
2664    }
2665
2666    #[gpui::test]
2667    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2668        init_test(cx);
2669
2670        let fs = FakeFs::new(cx.executor());
2671        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2672            .await;
2673        let project = Project::test(fs.clone(), [], cx).await;
2674        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2675        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2676        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2677            move |_, thread, mut cx| {
2678                let read_file_tx = read_file_tx.clone();
2679                async move {
2680                    let content = thread
2681                        .update(&mut cx, |thread, cx| {
2682                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2683                        })
2684                        .unwrap()
2685                        .await
2686                        .unwrap();
2687                    assert_eq!(content, "one\ntwo\nthree\n");
2688                    read_file_tx.take().unwrap().send(()).unwrap();
2689                    thread
2690                        .update(&mut cx, |thread, cx| {
2691                            thread.write_text_file(
2692                                path!("/tmp/foo").into(),
2693                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2694                                cx,
2695                            )
2696                        })
2697                        .unwrap()
2698                        .await
2699                        .unwrap();
2700                    Ok(acp::PromptResponse {
2701                        stop_reason: acp::StopReason::EndTurn,
2702                        meta: None,
2703                    })
2704                }
2705                .boxed_local()
2706            },
2707        ));
2708
2709        let (worktree, pathbuf) = project
2710            .update(cx, |project, cx| {
2711                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2712            })
2713            .await
2714            .unwrap();
2715        let buffer = project
2716            .update(cx, |project, cx| {
2717                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2718            })
2719            .await
2720            .unwrap();
2721
2722        let thread = cx
2723            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2724            .await
2725            .unwrap();
2726
2727        let request = thread.update(cx, |thread, cx| {
2728            thread.send_raw("Extend the count in /tmp/foo", cx)
2729        });
2730        read_file_rx.await.ok();
2731        buffer.update(cx, |buffer, cx| {
2732            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2733        });
2734        cx.run_until_parked();
2735        assert_eq!(
2736            buffer.read_with(cx, |buffer, _| buffer.text()),
2737            "zero\none\ntwo\nthree\nfour\nfive\n"
2738        );
2739        assert_eq!(
2740            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2741            "zero\none\ntwo\nthree\nfour\nfive\n"
2742        );
2743        request.await.unwrap();
2744    }
2745
2746    #[gpui::test]
2747    async fn test_reading_from_line(cx: &mut TestAppContext) {
2748        init_test(cx);
2749
2750        let fs = FakeFs::new(cx.executor());
2751        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2752            .await;
2753        let project = Project::test(fs.clone(), [], cx).await;
2754        project
2755            .update(cx, |project, cx| {
2756                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2757            })
2758            .await
2759            .unwrap();
2760
2761        let connection = Rc::new(FakeAgentConnection::new());
2762
2763        let thread = cx
2764            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2765            .await
2766            .unwrap();
2767
2768        // Whole file
2769        let content = thread
2770            .update(cx, |thread, cx| {
2771                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2772            })
2773            .await
2774            .unwrap();
2775
2776        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2777
2778        // Only start line
2779        let content = thread
2780            .update(cx, |thread, cx| {
2781                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2782            })
2783            .await
2784            .unwrap();
2785
2786        assert_eq!(content, "three\nfour\n");
2787
2788        // Only limit
2789        let content = thread
2790            .update(cx, |thread, cx| {
2791                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2792            })
2793            .await
2794            .unwrap();
2795
2796        assert_eq!(content, "one\ntwo\n");
2797
2798        // Range
2799        let content = thread
2800            .update(cx, |thread, cx| {
2801                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2802            })
2803            .await
2804            .unwrap();
2805
2806        assert_eq!(content, "two\nthree\n");
2807
2808        // Invalid
2809        let err = thread
2810            .update(cx, |thread, cx| {
2811                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2812            })
2813            .await
2814            .unwrap_err();
2815
2816        assert_eq!(
2817            err.to_string(),
2818            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2819        );
2820    }
2821
2822    #[gpui::test]
2823    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2824        init_test(cx);
2825
2826        let fs = FakeFs::new(cx.executor());
2827        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2828        let project = Project::test(fs.clone(), [], cx).await;
2829        project
2830            .update(cx, |project, cx| {
2831                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2832            })
2833            .await
2834            .unwrap();
2835
2836        let connection = Rc::new(FakeAgentConnection::new());
2837
2838        let thread = cx
2839            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2840            .await
2841            .unwrap();
2842
2843        // Whole file
2844        let content = thread
2845            .update(cx, |thread, cx| {
2846                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2847            })
2848            .await
2849            .unwrap();
2850
2851        assert_eq!(content, "");
2852
2853        // Only start line
2854        let content = thread
2855            .update(cx, |thread, cx| {
2856                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2857            })
2858            .await
2859            .unwrap();
2860
2861        assert_eq!(content, "");
2862
2863        // Only limit
2864        let content = thread
2865            .update(cx, |thread, cx| {
2866                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2867            })
2868            .await
2869            .unwrap();
2870
2871        assert_eq!(content, "");
2872
2873        // Range
2874        let content = thread
2875            .update(cx, |thread, cx| {
2876                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2877            })
2878            .await
2879            .unwrap();
2880
2881        assert_eq!(content, "");
2882
2883        // Invalid
2884        let err = thread
2885            .update(cx, |thread, cx| {
2886                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2887            })
2888            .await
2889            .unwrap_err();
2890
2891        assert_eq!(
2892            err.to_string(),
2893            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2894        );
2895    }
2896    #[gpui::test]
2897    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2898        init_test(cx);
2899
2900        let fs = FakeFs::new(cx.executor());
2901        fs.insert_tree(path!("/tmp"), json!({})).await;
2902        let project = Project::test(fs.clone(), [], cx).await;
2903        project
2904            .update(cx, |project, cx| {
2905                project.find_or_create_worktree(path!("/tmp"), true, cx)
2906            })
2907            .await
2908            .unwrap();
2909
2910        let connection = Rc::new(FakeAgentConnection::new());
2911
2912        let thread = cx
2913            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2914            .await
2915            .unwrap();
2916
2917        // Out of project file
2918        let err = thread
2919            .update(cx, |thread, cx| {
2920                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2921            })
2922            .await
2923            .unwrap_err();
2924
2925        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2926    }
2927
2928    #[gpui::test]
2929    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2930        init_test(cx);
2931
2932        let fs = FakeFs::new(cx.executor());
2933        let project = Project::test(fs, [], cx).await;
2934        let id = acp::ToolCallId("test".into());
2935
2936        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2937            let id = id.clone();
2938            move |_, thread, mut cx| {
2939                let id = id.clone();
2940                async move {
2941                    thread
2942                        .update(&mut cx, |thread, cx| {
2943                            thread.handle_session_update(
2944                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2945                                    id: id.clone(),
2946                                    title: "Label".into(),
2947                                    kind: acp::ToolKind::Fetch,
2948                                    status: acp::ToolCallStatus::InProgress,
2949                                    content: vec![],
2950                                    locations: vec![],
2951                                    raw_input: None,
2952                                    raw_output: None,
2953                                    meta: None,
2954                                }),
2955                                cx,
2956                            )
2957                        })
2958                        .unwrap()
2959                        .unwrap();
2960                    Ok(acp::PromptResponse {
2961                        stop_reason: acp::StopReason::EndTurn,
2962                        meta: None,
2963                    })
2964                }
2965                .boxed_local()
2966            }
2967        }));
2968
2969        let thread = cx
2970            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2971            .await
2972            .unwrap();
2973
2974        let request = thread.update(cx, |thread, cx| {
2975            thread.send_raw("Fetch https://example.com", cx)
2976        });
2977
2978        run_until_first_tool_call(&thread, cx).await;
2979
2980        thread.read_with(cx, |thread, _| {
2981            assert!(matches!(
2982                thread.entries[1],
2983                AgentThreadEntry::ToolCall(ToolCall {
2984                    status: ToolCallStatus::InProgress,
2985                    ..
2986                })
2987            ));
2988        });
2989
2990        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2991
2992        thread.read_with(cx, |thread, _| {
2993            assert!(matches!(
2994                &thread.entries[1],
2995                AgentThreadEntry::ToolCall(ToolCall {
2996                    status: ToolCallStatus::Canceled,
2997                    ..
2998                })
2999            ));
3000        });
3001
3002        thread
3003            .update(cx, |thread, cx| {
3004                thread.handle_session_update(
3005                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3006                        id,
3007                        fields: acp::ToolCallUpdateFields {
3008                            status: Some(acp::ToolCallStatus::Completed),
3009                            ..Default::default()
3010                        },
3011                        meta: None,
3012                    }),
3013                    cx,
3014                )
3015            })
3016            .unwrap();
3017
3018        request.await.unwrap();
3019
3020        thread.read_with(cx, |thread, _| {
3021            assert!(matches!(
3022                thread.entries[1],
3023                AgentThreadEntry::ToolCall(ToolCall {
3024                    status: ToolCallStatus::Completed,
3025                    ..
3026                })
3027            ));
3028        });
3029    }
3030
3031    #[gpui::test]
3032    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3033        init_test(cx);
3034        let fs = FakeFs::new(cx.background_executor.clone());
3035        fs.insert_tree(path!("/test"), json!({})).await;
3036        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3037
3038        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3039            move |_, thread, mut cx| {
3040                async move {
3041                    thread
3042                        .update(&mut cx, |thread, cx| {
3043                            thread.handle_session_update(
3044                                acp::SessionUpdate::ToolCall(acp::ToolCall {
3045                                    id: acp::ToolCallId("test".into()),
3046                                    title: "Label".into(),
3047                                    kind: acp::ToolKind::Edit,
3048                                    status: acp::ToolCallStatus::Completed,
3049                                    content: vec![acp::ToolCallContent::Diff {
3050                                        diff: acp::Diff {
3051                                            path: "/test/test.txt".into(),
3052                                            old_text: None,
3053                                            new_text: "foo".into(),
3054                                            meta: None,
3055                                        },
3056                                    }],
3057                                    locations: vec![],
3058                                    raw_input: None,
3059                                    raw_output: None,
3060                                    meta: None,
3061                                }),
3062                                cx,
3063                            )
3064                        })
3065                        .unwrap()
3066                        .unwrap();
3067                    Ok(acp::PromptResponse {
3068                        stop_reason: acp::StopReason::EndTurn,
3069                        meta: None,
3070                    })
3071                }
3072                .boxed_local()
3073            }
3074        }));
3075
3076        let thread = cx
3077            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3078            .await
3079            .unwrap();
3080
3081        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3082            .await
3083            .unwrap();
3084
3085        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3086    }
3087
3088    #[gpui::test(iterations = 10)]
3089    async fn test_checkpoints(cx: &mut TestAppContext) {
3090        init_test(cx);
3091        let fs = FakeFs::new(cx.background_executor.clone());
3092        fs.insert_tree(
3093            path!("/test"),
3094            json!({
3095                ".git": {}
3096            }),
3097        )
3098        .await;
3099        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3100
3101        let simulate_changes = Arc::new(AtomicBool::new(true));
3102        let next_filename = Arc::new(AtomicUsize::new(0));
3103        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3104            let simulate_changes = simulate_changes.clone();
3105            let next_filename = next_filename.clone();
3106            let fs = fs.clone();
3107            move |request, thread, mut cx| {
3108                let fs = fs.clone();
3109                let simulate_changes = simulate_changes.clone();
3110                let next_filename = next_filename.clone();
3111                async move {
3112                    if simulate_changes.load(SeqCst) {
3113                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3114                        fs.write(Path::new(&filename), b"").await?;
3115                    }
3116
3117                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3118                        panic!("expected text content block");
3119                    };
3120                    thread.update(&mut cx, |thread, cx| {
3121                        thread
3122                            .handle_session_update(
3123                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3124                                    content: content.text.to_uppercase().into(),
3125                                    meta: None,
3126                                }),
3127                                cx,
3128                            )
3129                            .unwrap();
3130                    })?;
3131                    Ok(acp::PromptResponse {
3132                        stop_reason: acp::StopReason::EndTurn,
3133                        meta: None,
3134                    })
3135                }
3136                .boxed_local()
3137            }
3138        }));
3139        let thread = cx
3140            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3141            .await
3142            .unwrap();
3143
3144        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3145            .await
3146            .unwrap();
3147        thread.read_with(cx, |thread, cx| {
3148            assert_eq!(
3149                thread.to_markdown(cx),
3150                indoc! {"
3151                    ## User (checkpoint)
3152
3153                    Lorem
3154
3155                    ## Assistant
3156
3157                    LOREM
3158
3159                "}
3160            );
3161        });
3162        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3163
3164        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3165            .await
3166            .unwrap();
3167        thread.read_with(cx, |thread, cx| {
3168            assert_eq!(
3169                thread.to_markdown(cx),
3170                indoc! {"
3171                    ## User (checkpoint)
3172
3173                    Lorem
3174
3175                    ## Assistant
3176
3177                    LOREM
3178
3179                    ## User (checkpoint)
3180
3181                    ipsum
3182
3183                    ## Assistant
3184
3185                    IPSUM
3186
3187                "}
3188            );
3189        });
3190        assert_eq!(
3191            fs.files(),
3192            vec![
3193                Path::new(path!("/test/file-0")),
3194                Path::new(path!("/test/file-1"))
3195            ]
3196        );
3197
3198        // Checkpoint isn't stored when there are no changes.
3199        simulate_changes.store(false, SeqCst);
3200        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3201            .await
3202            .unwrap();
3203        thread.read_with(cx, |thread, cx| {
3204            assert_eq!(
3205                thread.to_markdown(cx),
3206                indoc! {"
3207                    ## User (checkpoint)
3208
3209                    Lorem
3210
3211                    ## Assistant
3212
3213                    LOREM
3214
3215                    ## User (checkpoint)
3216
3217                    ipsum
3218
3219                    ## Assistant
3220
3221                    IPSUM
3222
3223                    ## User
3224
3225                    dolor
3226
3227                    ## Assistant
3228
3229                    DOLOR
3230
3231                "}
3232            );
3233        });
3234        assert_eq!(
3235            fs.files(),
3236            vec![
3237                Path::new(path!("/test/file-0")),
3238                Path::new(path!("/test/file-1"))
3239            ]
3240        );
3241
3242        // Rewinding the conversation truncates the history and restores the checkpoint.
3243        thread
3244            .update(cx, |thread, cx| {
3245                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3246                    panic!("unexpected entries {:?}", thread.entries)
3247                };
3248                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3249            })
3250            .await
3251            .unwrap();
3252        thread.read_with(cx, |thread, cx| {
3253            assert_eq!(
3254                thread.to_markdown(cx),
3255                indoc! {"
3256                    ## User (checkpoint)
3257
3258                    Lorem
3259
3260                    ## Assistant
3261
3262                    LOREM
3263
3264                "}
3265            );
3266        });
3267        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3268    }
3269
3270    #[gpui::test]
3271    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3272        use std::sync::atomic::AtomicUsize;
3273        init_test(cx);
3274
3275        let fs = FakeFs::new(cx.executor());
3276        let project = Project::test(fs, None, cx).await;
3277
3278        // Create a connection that simulates refusal after tool result
3279        let prompt_count = Arc::new(AtomicUsize::new(0));
3280        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3281            let prompt_count = prompt_count.clone();
3282            move |_request, thread, mut cx| {
3283                let count = prompt_count.fetch_add(1, SeqCst);
3284                async move {
3285                    if count == 0 {
3286                        // First prompt: Generate a tool call with result
3287                        thread.update(&mut cx, |thread, cx| {
3288                            thread
3289                                .handle_session_update(
3290                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3291                                        id: acp::ToolCallId("tool1".into()),
3292                                        title: "Test Tool".into(),
3293                                        kind: acp::ToolKind::Fetch,
3294                                        status: acp::ToolCallStatus::Completed,
3295                                        content: vec![],
3296                                        locations: vec![],
3297                                        raw_input: Some(serde_json::json!({"query": "test"})),
3298                                        raw_output: Some(
3299                                            serde_json::json!({"result": "inappropriate content"}),
3300                                        ),
3301                                        meta: None,
3302                                    }),
3303                                    cx,
3304                                )
3305                                .unwrap();
3306                        })?;
3307
3308                        // Now return refusal because of the tool result
3309                        Ok(acp::PromptResponse {
3310                            stop_reason: acp::StopReason::Refusal,
3311                            meta: None,
3312                        })
3313                    } else {
3314                        Ok(acp::PromptResponse {
3315                            stop_reason: acp::StopReason::EndTurn,
3316                            meta: None,
3317                        })
3318                    }
3319                }
3320                .boxed_local()
3321            }
3322        }));
3323
3324        let thread = cx
3325            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3326            .await
3327            .unwrap();
3328
3329        // Track if we see a Refusal event
3330        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3331        let saw_refusal_event_captured = saw_refusal_event.clone();
3332        thread.update(cx, |_thread, cx| {
3333            cx.subscribe(
3334                &thread,
3335                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3336                    if matches!(event, AcpThreadEvent::Refusal) {
3337                        *saw_refusal_event_captured.lock().unwrap() = true;
3338                    }
3339                },
3340            )
3341            .detach();
3342        });
3343
3344        // Send a user message - this will trigger tool call and then refusal
3345        let send_task = thread.update(cx, |thread, cx| {
3346            thread.send(
3347                vec![acp::ContentBlock::Text(acp::TextContent {
3348                    text: "Hello".into(),
3349                    annotations: None,
3350                    meta: None,
3351                })],
3352                cx,
3353            )
3354        });
3355        cx.background_executor.spawn(send_task).detach();
3356        cx.run_until_parked();
3357
3358        // Verify that:
3359        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3360        // 2. The user message was NOT truncated
3361        assert!(
3362            *saw_refusal_event.lock().unwrap(),
3363            "Refusal event should be emitted for tool result refusals"
3364        );
3365
3366        thread.read_with(cx, |thread, _| {
3367            let entries = thread.entries();
3368            assert!(entries.len() >= 2, "Should have user message and tool call");
3369
3370            // Verify user message is still there
3371            assert!(
3372                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3373                "User message should not be truncated"
3374            );
3375
3376            // Verify tool call is there with result
3377            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3378                assert!(
3379                    tool_call.raw_output.is_some(),
3380                    "Tool call should have output"
3381                );
3382            } else {
3383                panic!("Expected tool call at index 1");
3384            }
3385        });
3386    }
3387
3388    #[gpui::test]
3389    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3390        init_test(cx);
3391
3392        let fs = FakeFs::new(cx.executor());
3393        let project = Project::test(fs, None, cx).await;
3394
3395        let refuse_next = Arc::new(AtomicBool::new(false));
3396        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3397            let refuse_next = refuse_next.clone();
3398            move |_request, _thread, _cx| {
3399                if refuse_next.load(SeqCst) {
3400                    async move {
3401                        Ok(acp::PromptResponse {
3402                            stop_reason: acp::StopReason::Refusal,
3403                            meta: None,
3404                        })
3405                    }
3406                    .boxed_local()
3407                } else {
3408                    async move {
3409                        Ok(acp::PromptResponse {
3410                            stop_reason: acp::StopReason::EndTurn,
3411                            meta: None,
3412                        })
3413                    }
3414                    .boxed_local()
3415                }
3416            }
3417        }));
3418
3419        let thread = cx
3420            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3421            .await
3422            .unwrap();
3423
3424        // Track if we see a Refusal event
3425        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3426        let saw_refusal_event_captured = saw_refusal_event.clone();
3427        thread.update(cx, |_thread, cx| {
3428            cx.subscribe(
3429                &thread,
3430                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3431                    if matches!(event, AcpThreadEvent::Refusal) {
3432                        *saw_refusal_event_captured.lock().unwrap() = true;
3433                    }
3434                },
3435            )
3436            .detach();
3437        });
3438
3439        // Send a message that will be refused
3440        refuse_next.store(true, SeqCst);
3441        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3442            .await
3443            .unwrap();
3444
3445        // Verify that a Refusal event WAS emitted for user prompt refusal
3446        assert!(
3447            *saw_refusal_event.lock().unwrap(),
3448            "Refusal event should be emitted for user prompt refusals"
3449        );
3450
3451        // Verify the message was truncated (user prompt refusal)
3452        thread.read_with(cx, |thread, cx| {
3453            assert_eq!(thread.to_markdown(cx), "");
3454        });
3455    }
3456
3457    #[gpui::test]
3458    async fn test_refusal(cx: &mut TestAppContext) {
3459        init_test(cx);
3460        let fs = FakeFs::new(cx.background_executor.clone());
3461        fs.insert_tree(path!("/"), json!({})).await;
3462        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3463
3464        let refuse_next = Arc::new(AtomicBool::new(false));
3465        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3466            let refuse_next = refuse_next.clone();
3467            move |request, thread, mut cx| {
3468                let refuse_next = refuse_next.clone();
3469                async move {
3470                    if refuse_next.load(SeqCst) {
3471                        return Ok(acp::PromptResponse {
3472                            stop_reason: acp::StopReason::Refusal,
3473                            meta: None,
3474                        });
3475                    }
3476
3477                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3478                        panic!("expected text content block");
3479                    };
3480                    thread.update(&mut cx, |thread, cx| {
3481                        thread
3482                            .handle_session_update(
3483                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3484                                    content: content.text.to_uppercase().into(),
3485                                    meta: None,
3486                                }),
3487                                cx,
3488                            )
3489                            .unwrap();
3490                    })?;
3491                    Ok(acp::PromptResponse {
3492                        stop_reason: acp::StopReason::EndTurn,
3493                        meta: None,
3494                    })
3495                }
3496                .boxed_local()
3497            }
3498        }));
3499        let thread = cx
3500            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3501            .await
3502            .unwrap();
3503
3504        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3505            .await
3506            .unwrap();
3507        thread.read_with(cx, |thread, cx| {
3508            assert_eq!(
3509                thread.to_markdown(cx),
3510                indoc! {"
3511                    ## User
3512
3513                    hello
3514
3515                    ## Assistant
3516
3517                    HELLO
3518
3519                "}
3520            );
3521        });
3522
3523        // Simulate refusing the second message. The message should be truncated
3524        // when a user prompt is refused.
3525        refuse_next.store(true, SeqCst);
3526        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3527            .await
3528            .unwrap();
3529        thread.read_with(cx, |thread, cx| {
3530            assert_eq!(
3531                thread.to_markdown(cx),
3532                indoc! {"
3533                    ## User
3534
3535                    hello
3536
3537                    ## Assistant
3538
3539                    HELLO
3540
3541                "}
3542            );
3543        });
3544    }
3545
3546    async fn run_until_first_tool_call(
3547        thread: &Entity<AcpThread>,
3548        cx: &mut TestAppContext,
3549    ) -> usize {
3550        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3551
3552        let subscription = cx.update(|cx| {
3553            cx.subscribe(thread, move |thread, _, cx| {
3554                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3555                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3556                        return tx.try_send(ix).unwrap();
3557                    }
3558                }
3559            })
3560        });
3561
3562        select! {
3563            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3564                panic!("Timeout waiting for tool call")
3565            }
3566            ix = rx.next().fuse() => {
3567                drop(subscription);
3568                ix.unwrap()
3569            }
3570        }
3571    }
3572
3573    #[derive(Clone, Default)]
3574    struct FakeAgentConnection {
3575        auth_methods: Vec<acp::AuthMethod>,
3576        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3577        on_user_message: Option<
3578            Rc<
3579                dyn Fn(
3580                        acp::PromptRequest,
3581                        WeakEntity<AcpThread>,
3582                        AsyncApp,
3583                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3584                    + 'static,
3585            >,
3586        >,
3587    }
3588
3589    impl FakeAgentConnection {
3590        fn new() -> Self {
3591            Self {
3592                auth_methods: Vec::new(),
3593                on_user_message: None,
3594                sessions: Arc::default(),
3595            }
3596        }
3597
3598        #[expect(unused)]
3599        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3600            self.auth_methods = auth_methods;
3601            self
3602        }
3603
3604        fn on_user_message(
3605            mut self,
3606            handler: impl Fn(
3607                acp::PromptRequest,
3608                WeakEntity<AcpThread>,
3609                AsyncApp,
3610            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3611            + 'static,
3612        ) -> Self {
3613            self.on_user_message.replace(Rc::new(handler));
3614            self
3615        }
3616    }
3617
3618    impl AgentConnection for FakeAgentConnection {
3619        fn auth_methods(&self) -> &[acp::AuthMethod] {
3620            &self.auth_methods
3621        }
3622
3623        fn new_thread(
3624            self: Rc<Self>,
3625            project: Entity<Project>,
3626            _cwd: &Path,
3627            cx: &mut App,
3628        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3629            let session_id = acp::SessionId(
3630                rand::rng()
3631                    .sample_iter(&distr::Alphanumeric)
3632                    .take(7)
3633                    .map(char::from)
3634                    .collect::<String>()
3635                    .into(),
3636            );
3637            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3638            let thread = cx.new(|cx| {
3639                AcpThread::new(
3640                    "Test",
3641                    self.clone(),
3642                    project,
3643                    action_log,
3644                    session_id.clone(),
3645                    watch::Receiver::constant(acp::PromptCapabilities {
3646                        image: true,
3647                        audio: true,
3648                        embedded_context: true,
3649                        meta: None,
3650                    }),
3651                    cx,
3652                )
3653            });
3654            self.sessions.lock().insert(session_id, thread.downgrade());
3655            Task::ready(Ok(thread))
3656        }
3657
3658        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3659            if self.auth_methods().iter().any(|m| m.id == method) {
3660                Task::ready(Ok(()))
3661            } else {
3662                Task::ready(Err(anyhow!("Invalid Auth Method")))
3663            }
3664        }
3665
3666        fn prompt(
3667            &self,
3668            _id: Option<UserMessageId>,
3669            params: acp::PromptRequest,
3670            cx: &mut App,
3671        ) -> Task<gpui::Result<acp::PromptResponse>> {
3672            let sessions = self.sessions.lock();
3673            let thread = sessions.get(&params.session_id).unwrap();
3674            if let Some(handler) = &self.on_user_message {
3675                let handler = handler.clone();
3676                let thread = thread.clone();
3677                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3678            } else {
3679                Task::ready(Ok(acp::PromptResponse {
3680                    stop_reason: acp::StopReason::EndTurn,
3681                    meta: None,
3682                }))
3683            }
3684        }
3685
3686        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3687            let sessions = self.sessions.lock();
3688            let thread = sessions.get(session_id).unwrap().clone();
3689
3690            cx.spawn(async move |cx| {
3691                thread
3692                    .update(cx, |thread, cx| thread.cancel(cx))
3693                    .unwrap()
3694                    .await
3695            })
3696            .detach();
3697        }
3698
3699        fn truncate(
3700            &self,
3701            session_id: &acp::SessionId,
3702            _cx: &App,
3703        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3704            Some(Rc::new(FakeAgentSessionEditor {
3705                _session_id: session_id.clone(),
3706            }))
3707        }
3708
3709        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3710            self
3711        }
3712    }
3713
3714    struct FakeAgentSessionEditor {
3715        _session_id: acp::SessionId,
3716    }
3717
3718    impl AgentSessionTruncate for FakeAgentSessionEditor {
3719        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3720            Task::ready(Ok(()))
3721        }
3722    }
3723
3724    #[gpui::test]
3725    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3726        init_test(cx);
3727
3728        let fs = FakeFs::new(cx.executor());
3729        let project = Project::test(fs, [], cx).await;
3730        let connection = Rc::new(FakeAgentConnection::new());
3731        let thread = cx
3732            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3733            .await
3734            .unwrap();
3735
3736        // Try to update a tool call that doesn't exist
3737        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3738        thread.update(cx, |thread, cx| {
3739            let result = thread.handle_session_update(
3740                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3741                    id: nonexistent_id.clone(),
3742                    fields: acp::ToolCallUpdateFields {
3743                        status: Some(acp::ToolCallStatus::Completed),
3744                        ..Default::default()
3745                    },
3746                    meta: None,
3747                }),
3748                cx,
3749            );
3750
3751            // The update should succeed (not return an error)
3752            assert!(result.is_ok());
3753
3754            // There should now be exactly one entry in the thread
3755            assert_eq!(thread.entries.len(), 1);
3756
3757            // The entry should be a failed tool call
3758            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3759                assert_eq!(tool_call.id, nonexistent_id);
3760                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3761                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3762
3763                // Check that the content contains the error message
3764                assert_eq!(tool_call.content.len(), 1);
3765                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3766                    match content_block {
3767                        ContentBlock::Markdown { markdown } => {
3768                            let markdown_text = markdown.read(cx).source();
3769                            assert!(markdown_text.contains("Tool call not found"));
3770                        }
3771                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3772                        ContentBlock::ResourceLink { .. } => {
3773                            panic!("Expected markdown content, got resource link")
3774                        }
3775                    }
3776                } else {
3777                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3778                }
3779            } else {
3780                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3781            }
3782        });
3783    }
3784}