acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use agent_settings::AgentSettings;
   7use collections::HashSet;
   8pub use connection::*;
   9pub use diff::*;
  10use language::language_settings::FormatOnSave;
  11pub use mention::*;
  12use project::lsp_store::{FormatTrigger, LspFormatTarget};
  13use serde::{Deserialize, Serialize};
  14use settings::Settings as _;
  15use task::{Shell, ShellBuilder};
  16pub use terminal::*;
  17
  18use action_log::{ActionLog, ActionLogTelemetry};
  19use agent_client_protocol::{self as acp};
  20use anyhow::{Context as _, Result, anyhow};
  21use editor::Bias;
  22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  24use itertools::Itertools;
  25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  26use markdown::Markdown;
  27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  28use std::collections::HashMap;
  29use std::error::Error;
  30use std::fmt::{Formatter, Write};
  31use std::ops::Range;
  32use std::process::ExitStatus;
  33use std::rc::Rc;
  34use std::time::{Duration, Instant};
  35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  36use ui::App;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40#[derive(Debug)]
  41pub struct UserMessage {
  42    pub id: Option<UserMessageId>,
  43    pub content: ContentBlock,
  44    pub chunks: Vec<acp::ContentBlock>,
  45    pub checkpoint: Option<Checkpoint>,
  46}
  47
  48#[derive(Debug)]
  49pub struct Checkpoint {
  50    git_checkpoint: GitStoreCheckpoint,
  51    pub show: bool,
  52}
  53
  54impl UserMessage {
  55    fn to_markdown(&self, cx: &App) -> String {
  56        let mut markdown = String::new();
  57        if self
  58            .checkpoint
  59            .as_ref()
  60            .is_some_and(|checkpoint| checkpoint.show)
  61        {
  62            writeln!(markdown, "## User (checkpoint)").unwrap();
  63        } else {
  64            writeln!(markdown, "## User").unwrap();
  65        }
  66        writeln!(markdown).unwrap();
  67        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  68        writeln!(markdown).unwrap();
  69        markdown
  70    }
  71}
  72
  73#[derive(Debug, PartialEq)]
  74pub struct AssistantMessage {
  75    pub chunks: Vec<AssistantMessageChunk>,
  76}
  77
  78impl AssistantMessage {
  79    pub fn to_markdown(&self, cx: &App) -> String {
  80        format!(
  81            "## Assistant\n\n{}\n\n",
  82            self.chunks
  83                .iter()
  84                .map(|chunk| chunk.to_markdown(cx))
  85                .join("\n\n")
  86        )
  87    }
  88}
  89
  90#[derive(Debug, PartialEq)]
  91pub enum AssistantMessageChunk {
  92    Message { block: ContentBlock },
  93    Thought { block: ContentBlock },
  94}
  95
  96impl AssistantMessageChunk {
  97    pub fn from_str(
  98        chunk: &str,
  99        language_registry: &Arc<LanguageRegistry>,
 100        path_style: PathStyle,
 101        cx: &mut App,
 102    ) -> Self {
 103        Self::Message {
 104            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 105        }
 106    }
 107
 108    fn to_markdown(&self, cx: &App) -> String {
 109        match self {
 110            Self::Message { block } => block.to_markdown(cx).to_string(),
 111            Self::Thought { block } => {
 112                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 113            }
 114        }
 115    }
 116}
 117
 118#[derive(Debug)]
 119pub enum AgentThreadEntry {
 120    UserMessage(UserMessage),
 121    AssistantMessage(AssistantMessage),
 122    ToolCall(ToolCall),
 123}
 124
 125impl AgentThreadEntry {
 126    pub fn to_markdown(&self, cx: &App) -> String {
 127        match self {
 128            Self::UserMessage(message) => message.to_markdown(cx),
 129            Self::AssistantMessage(message) => message.to_markdown(cx),
 130            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 131        }
 132    }
 133
 134    pub fn user_message(&self) -> Option<&UserMessage> {
 135        if let AgentThreadEntry::UserMessage(message) = self {
 136            Some(message)
 137        } else {
 138            None
 139        }
 140    }
 141
 142    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 143        if let AgentThreadEntry::ToolCall(call) = self {
 144            itertools::Either::Left(call.diffs())
 145        } else {
 146            itertools::Either::Right(std::iter::empty())
 147        }
 148    }
 149
 150    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 151        if let AgentThreadEntry::ToolCall(call) = self {
 152            itertools::Either::Left(call.terminals())
 153        } else {
 154            itertools::Either::Right(std::iter::empty())
 155        }
 156    }
 157
 158    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 159        if let AgentThreadEntry::ToolCall(ToolCall {
 160            locations,
 161            resolved_locations,
 162            ..
 163        }) = self
 164        {
 165            Some((
 166                locations.get(ix)?.clone(),
 167                resolved_locations.get(ix)?.clone()?,
 168            ))
 169        } else {
 170            None
 171        }
 172    }
 173}
 174
 175#[derive(Debug)]
 176pub struct ToolCall {
 177    pub id: acp::ToolCallId,
 178    pub label: Entity<Markdown>,
 179    pub kind: acp::ToolKind,
 180    pub content: Vec<ToolCallContent>,
 181    pub status: ToolCallStatus,
 182    pub locations: Vec<acp::ToolCallLocation>,
 183    pub resolved_locations: Vec<Option<AgentLocation>>,
 184    pub raw_input: Option<serde_json::Value>,
 185    pub raw_output: Option<serde_json::Value>,
 186}
 187
 188impl ToolCall {
 189    fn from_acp(
 190        tool_call: acp::ToolCall,
 191        status: ToolCallStatus,
 192        language_registry: Arc<LanguageRegistry>,
 193        path_style: PathStyle,
 194        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 195        cx: &mut App,
 196    ) -> Result<Self> {
 197        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 198            first_line.to_owned() + ""
 199        } else {
 200            tool_call.title
 201        };
 202        let mut content = Vec::with_capacity(tool_call.content.len());
 203        for item in tool_call.content {
 204            content.push(ToolCallContent::from_acp(
 205                item,
 206                language_registry.clone(),
 207                path_style,
 208                terminals,
 209                cx,
 210            )?);
 211        }
 212
 213        let result = Self {
 214            id: tool_call.id,
 215            label: cx
 216                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 217            kind: tool_call.kind,
 218            content,
 219            locations: tool_call.locations,
 220            resolved_locations: Vec::default(),
 221            status,
 222            raw_input: tool_call.raw_input,
 223            raw_output: tool_call.raw_output,
 224        };
 225        Ok(result)
 226    }
 227
 228    fn update_fields(
 229        &mut self,
 230        fields: acp::ToolCallUpdateFields,
 231        language_registry: Arc<LanguageRegistry>,
 232        path_style: PathStyle,
 233        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 234        cx: &mut App,
 235    ) -> Result<()> {
 236        let acp::ToolCallUpdateFields {
 237            kind,
 238            status,
 239            title,
 240            content,
 241            locations,
 242            raw_input,
 243            raw_output,
 244        } = fields;
 245
 246        if let Some(kind) = kind {
 247            self.kind = kind;
 248        }
 249
 250        if let Some(status) = status {
 251            self.status = status.into();
 252        }
 253
 254        if let Some(title) = title {
 255            self.label.update(cx, |label, cx| {
 256                if let Some((first_line, _)) = title.split_once("\n") {
 257                    label.replace(first_line.to_owned() + "", cx)
 258                } else {
 259                    label.replace(title, cx);
 260                }
 261            });
 262        }
 263
 264        if let Some(content) = content {
 265            let new_content_len = content.len();
 266            let mut content = content.into_iter();
 267
 268            // Reuse existing content if we can
 269            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 270                old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 271            }
 272            for new in content {
 273                self.content.push(ToolCallContent::from_acp(
 274                    new,
 275                    language_registry.clone(),
 276                    path_style,
 277                    terminals,
 278                    cx,
 279                )?)
 280            }
 281            self.content.truncate(new_content_len);
 282        }
 283
 284        if let Some(locations) = locations {
 285            self.locations = locations;
 286        }
 287
 288        if let Some(raw_input) = raw_input {
 289            self.raw_input = Some(raw_input);
 290        }
 291
 292        if let Some(raw_output) = raw_output {
 293            if self.content.is_empty()
 294                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 295            {
 296                self.content
 297                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 298                        markdown,
 299                    }));
 300            }
 301            self.raw_output = Some(raw_output);
 302        }
 303        Ok(())
 304    }
 305
 306    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 307        self.content.iter().filter_map(|content| match content {
 308            ToolCallContent::Diff(diff) => Some(diff),
 309            ToolCallContent::ContentBlock(_) => None,
 310            ToolCallContent::Terminal(_) => None,
 311        })
 312    }
 313
 314    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 315        self.content.iter().filter_map(|content| match content {
 316            ToolCallContent::Terminal(terminal) => Some(terminal),
 317            ToolCallContent::ContentBlock(_) => None,
 318            ToolCallContent::Diff(_) => None,
 319        })
 320    }
 321
 322    fn to_markdown(&self, cx: &App) -> String {
 323        let mut markdown = format!(
 324            "**Tool Call: {}**\nStatus: {}\n\n",
 325            self.label.read(cx).source(),
 326            self.status
 327        );
 328        for content in &self.content {
 329            markdown.push_str(content.to_markdown(cx).as_str());
 330            markdown.push_str("\n\n");
 331        }
 332        markdown
 333    }
 334
 335    async fn resolve_location(
 336        location: acp::ToolCallLocation,
 337        project: WeakEntity<Project>,
 338        cx: &mut AsyncApp,
 339    ) -> Option<ResolvedLocation> {
 340        let buffer = project
 341            .update(cx, |project, cx| {
 342                project
 343                    .project_path_for_absolute_path(&location.path, cx)
 344                    .map(|path| project.open_buffer(path, cx))
 345            })
 346            .ok()??;
 347        let buffer = buffer.await.log_err()?;
 348        let position = buffer
 349            .update(cx, |buffer, _| {
 350                if let Some(row) = location.line {
 351                    let snapshot = buffer.snapshot();
 352                    let column = snapshot.indent_size_for_line(row).len;
 353                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 354                    snapshot.anchor_before(point)
 355                } else {
 356                    Anchor::MIN
 357                }
 358            })
 359            .ok()?;
 360
 361        Some(ResolvedLocation { buffer, position })
 362    }
 363
 364    fn resolve_locations(
 365        &self,
 366        project: Entity<Project>,
 367        cx: &mut App,
 368    ) -> Task<Vec<Option<ResolvedLocation>>> {
 369        let locations = self.locations.clone();
 370        project.update(cx, |_, cx| {
 371            cx.spawn(async move |project, cx| {
 372                let mut new_locations = Vec::new();
 373                for location in locations {
 374                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 375                }
 376                new_locations
 377            })
 378        })
 379    }
 380}
 381
 382// Separate so we can hold a strong reference to the buffer
 383// for saving on the thread
 384#[derive(Clone, Debug, PartialEq, Eq)]
 385struct ResolvedLocation {
 386    buffer: Entity<Buffer>,
 387    position: Anchor,
 388}
 389
 390impl From<&ResolvedLocation> for AgentLocation {
 391    fn from(value: &ResolvedLocation) -> Self {
 392        Self {
 393            buffer: value.buffer.downgrade(),
 394            position: value.position,
 395        }
 396    }
 397}
 398
 399#[derive(Debug)]
 400pub enum ToolCallStatus {
 401    /// The tool call hasn't started running yet, but we start showing it to
 402    /// the user.
 403    Pending,
 404    /// The tool call is waiting for confirmation from the user.
 405    WaitingForConfirmation {
 406        options: Vec<acp::PermissionOption>,
 407        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 408    },
 409    /// The tool call is currently running.
 410    InProgress,
 411    /// The tool call completed successfully.
 412    Completed,
 413    /// The tool call failed.
 414    Failed,
 415    /// The user rejected the tool call.
 416    Rejected,
 417    /// The user canceled generation so the tool call was canceled.
 418    Canceled,
 419}
 420
 421impl From<acp::ToolCallStatus> for ToolCallStatus {
 422    fn from(status: acp::ToolCallStatus) -> Self {
 423        match status {
 424            acp::ToolCallStatus::Pending => Self::Pending,
 425            acp::ToolCallStatus::InProgress => Self::InProgress,
 426            acp::ToolCallStatus::Completed => Self::Completed,
 427            acp::ToolCallStatus::Failed => Self::Failed,
 428        }
 429    }
 430}
 431
 432impl Display for ToolCallStatus {
 433    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 434        write!(
 435            f,
 436            "{}",
 437            match self {
 438                ToolCallStatus::Pending => "Pending",
 439                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 440                ToolCallStatus::InProgress => "In Progress",
 441                ToolCallStatus::Completed => "Completed",
 442                ToolCallStatus::Failed => "Failed",
 443                ToolCallStatus::Rejected => "Rejected",
 444                ToolCallStatus::Canceled => "Canceled",
 445            }
 446        )
 447    }
 448}
 449
 450#[derive(Debug, PartialEq, Clone)]
 451pub enum ContentBlock {
 452    Empty,
 453    Markdown { markdown: Entity<Markdown> },
 454    ResourceLink { resource_link: acp::ResourceLink },
 455}
 456
 457impl ContentBlock {
 458    pub fn new(
 459        block: acp::ContentBlock,
 460        language_registry: &Arc<LanguageRegistry>,
 461        path_style: PathStyle,
 462        cx: &mut App,
 463    ) -> Self {
 464        let mut this = Self::Empty;
 465        this.append(block, language_registry, path_style, cx);
 466        this
 467    }
 468
 469    pub fn new_combined(
 470        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 471        language_registry: Arc<LanguageRegistry>,
 472        path_style: PathStyle,
 473        cx: &mut App,
 474    ) -> Self {
 475        let mut this = Self::Empty;
 476        for block in blocks {
 477            this.append(block, &language_registry, path_style, cx);
 478        }
 479        this
 480    }
 481
 482    pub fn append(
 483        &mut self,
 484        block: acp::ContentBlock,
 485        language_registry: &Arc<LanguageRegistry>,
 486        path_style: PathStyle,
 487        cx: &mut App,
 488    ) {
 489        if matches!(self, ContentBlock::Empty)
 490            && let acp::ContentBlock::ResourceLink(resource_link) = block
 491        {
 492            *self = ContentBlock::ResourceLink { resource_link };
 493            return;
 494        }
 495
 496        let new_content = self.block_string_contents(block, path_style);
 497
 498        match self {
 499            ContentBlock::Empty => {
 500                *self = Self::create_markdown_block(new_content, language_registry, cx);
 501            }
 502            ContentBlock::Markdown { markdown } => {
 503                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 504            }
 505            ContentBlock::ResourceLink { resource_link } => {
 506                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 507                let combined = format!("{}\n{}", existing_content, new_content);
 508
 509                *self = Self::create_markdown_block(combined, language_registry, cx);
 510            }
 511        }
 512    }
 513
 514    fn create_markdown_block(
 515        content: String,
 516        language_registry: &Arc<LanguageRegistry>,
 517        cx: &mut App,
 518    ) -> ContentBlock {
 519        ContentBlock::Markdown {
 520            markdown: cx
 521                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 522        }
 523    }
 524
 525    fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
 526        match block {
 527            acp::ContentBlock::Text(text_content) => text_content.text,
 528            acp::ContentBlock::ResourceLink(resource_link) => {
 529                Self::resource_link_md(&resource_link.uri, path_style)
 530            }
 531            acp::ContentBlock::Resource(acp::EmbeddedResource {
 532                resource:
 533                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 534                        uri,
 535                        ..
 536                    }),
 537                ..
 538            }) => Self::resource_link_md(&uri, path_style),
 539            acp::ContentBlock::Image(image) => Self::image_md(&image),
 540            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 541        }
 542    }
 543
 544    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 545        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 546            uri.as_link().to_string()
 547        } else {
 548            uri.to_string()
 549        }
 550    }
 551
 552    fn image_md(_image: &acp::ImageContent) -> String {
 553        "`Image`".into()
 554    }
 555
 556    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 557        match self {
 558            ContentBlock::Empty => "",
 559            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 560            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 561        }
 562    }
 563
 564    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 565        match self {
 566            ContentBlock::Empty => None,
 567            ContentBlock::Markdown { markdown } => Some(markdown),
 568            ContentBlock::ResourceLink { .. } => None,
 569        }
 570    }
 571
 572    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 573        match self {
 574            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 575            _ => None,
 576        }
 577    }
 578}
 579
 580#[derive(Debug)]
 581pub enum ToolCallContent {
 582    ContentBlock(ContentBlock),
 583    Diff(Entity<Diff>),
 584    Terminal(Entity<Terminal>),
 585}
 586
 587impl ToolCallContent {
 588    pub fn from_acp(
 589        content: acp::ToolCallContent,
 590        language_registry: Arc<LanguageRegistry>,
 591        path_style: PathStyle,
 592        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 593        cx: &mut App,
 594    ) -> Result<Self> {
 595        match content {
 596            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 597                content,
 598                &language_registry,
 599                path_style,
 600                cx,
 601            ))),
 602            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 603                Diff::finalized(
 604                    diff.path.to_string_lossy().into_owned(),
 605                    diff.old_text,
 606                    diff.new_text,
 607                    language_registry,
 608                    cx,
 609                )
 610            }))),
 611            acp::ToolCallContent::Terminal { terminal_id } => terminals
 612                .get(&terminal_id)
 613                .cloned()
 614                .map(Self::Terminal)
 615                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 616        }
 617    }
 618
 619    pub fn update_from_acp(
 620        &mut self,
 621        new: acp::ToolCallContent,
 622        language_registry: Arc<LanguageRegistry>,
 623        path_style: PathStyle,
 624        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 625        cx: &mut App,
 626    ) -> Result<()> {
 627        let needs_update = match (&self, &new) {
 628            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 629                old_diff.read(cx).needs_update(
 630                    new_diff.old_text.as_deref().unwrap_or(""),
 631                    &new_diff.new_text,
 632                    cx,
 633                )
 634            }
 635            _ => true,
 636        };
 637
 638        if needs_update {
 639            *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
 640        }
 641        Ok(())
 642    }
 643
 644    pub fn to_markdown(&self, cx: &App) -> String {
 645        match self {
 646            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 647            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 648            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 649        }
 650    }
 651}
 652
 653#[derive(Debug, PartialEq)]
 654pub enum ToolCallUpdate {
 655    UpdateFields(acp::ToolCallUpdate),
 656    UpdateDiff(ToolCallUpdateDiff),
 657    UpdateTerminal(ToolCallUpdateTerminal),
 658}
 659
 660impl ToolCallUpdate {
 661    fn id(&self) -> &acp::ToolCallId {
 662        match self {
 663            Self::UpdateFields(update) => &update.id,
 664            Self::UpdateDiff(diff) => &diff.id,
 665            Self::UpdateTerminal(terminal) => &terminal.id,
 666        }
 667    }
 668}
 669
 670impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 671    fn from(update: acp::ToolCallUpdate) -> Self {
 672        Self::UpdateFields(update)
 673    }
 674}
 675
 676impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 677    fn from(diff: ToolCallUpdateDiff) -> Self {
 678        Self::UpdateDiff(diff)
 679    }
 680}
 681
 682#[derive(Debug, PartialEq)]
 683pub struct ToolCallUpdateDiff {
 684    pub id: acp::ToolCallId,
 685    pub diff: Entity<Diff>,
 686}
 687
 688impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 689    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 690        Self::UpdateTerminal(terminal)
 691    }
 692}
 693
 694#[derive(Debug, PartialEq)]
 695pub struct ToolCallUpdateTerminal {
 696    pub id: acp::ToolCallId,
 697    pub terminal: Entity<Terminal>,
 698}
 699
 700#[derive(Debug, Default)]
 701pub struct Plan {
 702    pub entries: Vec<PlanEntry>,
 703}
 704
 705#[derive(Debug)]
 706pub struct PlanStats<'a> {
 707    pub in_progress_entry: Option<&'a PlanEntry>,
 708    pub pending: u32,
 709    pub completed: u32,
 710}
 711
 712impl Plan {
 713    pub fn is_empty(&self) -> bool {
 714        self.entries.is_empty()
 715    }
 716
 717    pub fn stats(&self) -> PlanStats<'_> {
 718        let mut stats = PlanStats {
 719            in_progress_entry: None,
 720            pending: 0,
 721            completed: 0,
 722        };
 723
 724        for entry in &self.entries {
 725            match &entry.status {
 726                acp::PlanEntryStatus::Pending => {
 727                    stats.pending += 1;
 728                }
 729                acp::PlanEntryStatus::InProgress => {
 730                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 731                }
 732                acp::PlanEntryStatus::Completed => {
 733                    stats.completed += 1;
 734                }
 735            }
 736        }
 737
 738        stats
 739    }
 740}
 741
 742#[derive(Debug)]
 743pub struct PlanEntry {
 744    pub content: Entity<Markdown>,
 745    pub priority: acp::PlanEntryPriority,
 746    pub status: acp::PlanEntryStatus,
 747}
 748
 749impl PlanEntry {
 750    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 751        Self {
 752            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 753            priority: entry.priority,
 754            status: entry.status,
 755        }
 756    }
 757}
 758
 759#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 760pub struct TokenUsage {
 761    pub max_tokens: u64,
 762    pub used_tokens: u64,
 763}
 764
 765impl TokenUsage {
 766    pub fn ratio(&self) -> TokenUsageRatio {
 767        #[cfg(debug_assertions)]
 768        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 769            .unwrap_or("0.8".to_string())
 770            .parse()
 771            .unwrap();
 772        #[cfg(not(debug_assertions))]
 773        let warning_threshold: f32 = 0.8;
 774
 775        // When the maximum is unknown because there is no selected model,
 776        // avoid showing the token limit warning.
 777        if self.max_tokens == 0 {
 778            TokenUsageRatio::Normal
 779        } else if self.used_tokens >= self.max_tokens {
 780            TokenUsageRatio::Exceeded
 781        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 782            TokenUsageRatio::Warning
 783        } else {
 784            TokenUsageRatio::Normal
 785        }
 786    }
 787}
 788
 789#[derive(Debug, Clone, PartialEq, Eq)]
 790pub enum TokenUsageRatio {
 791    Normal,
 792    Warning,
 793    Exceeded,
 794}
 795
 796#[derive(Debug, Clone)]
 797pub struct RetryStatus {
 798    pub last_error: SharedString,
 799    pub attempt: usize,
 800    pub max_attempts: usize,
 801    pub started_at: Instant,
 802    pub duration: Duration,
 803}
 804
 805pub struct AcpThread {
 806    title: SharedString,
 807    entries: Vec<AgentThreadEntry>,
 808    plan: Plan,
 809    project: Entity<Project>,
 810    action_log: Entity<ActionLog>,
 811    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 812    send_task: Option<Task<()>>,
 813    connection: Rc<dyn AgentConnection>,
 814    session_id: acp::SessionId,
 815    token_usage: Option<TokenUsage>,
 816    prompt_capabilities: acp::PromptCapabilities,
 817    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 818    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 819    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 820    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 821}
 822
 823impl From<&AcpThread> for ActionLogTelemetry {
 824    fn from(value: &AcpThread) -> Self {
 825        Self {
 826            agent_telemetry_id: value.connection().telemetry_id(),
 827            session_id: value.session_id.0.clone(),
 828        }
 829    }
 830}
 831
 832#[derive(Debug)]
 833pub enum AcpThreadEvent {
 834    NewEntry,
 835    TitleUpdated,
 836    TokenUsageUpdated,
 837    EntryUpdated(usize),
 838    EntriesRemoved(Range<usize>),
 839    ToolAuthorizationRequired,
 840    Retry(RetryStatus),
 841    Stopped,
 842    Error,
 843    LoadError(LoadError),
 844    PromptCapabilitiesUpdated,
 845    Refusal,
 846    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 847    ModeUpdated(acp::SessionModeId),
 848}
 849
 850impl EventEmitter<AcpThreadEvent> for AcpThread {}
 851
 852#[derive(Debug, Clone)]
 853pub enum TerminalProviderEvent {
 854    Created {
 855        terminal_id: acp::TerminalId,
 856        label: String,
 857        cwd: Option<PathBuf>,
 858        output_byte_limit: Option<u64>,
 859        terminal: Entity<::terminal::Terminal>,
 860    },
 861    Output {
 862        terminal_id: acp::TerminalId,
 863        data: Vec<u8>,
 864    },
 865    TitleChanged {
 866        terminal_id: acp::TerminalId,
 867        title: String,
 868    },
 869    Exit {
 870        terminal_id: acp::TerminalId,
 871        status: acp::TerminalExitStatus,
 872    },
 873}
 874
 875#[derive(Debug, Clone)]
 876pub enum TerminalProviderCommand {
 877    WriteInput {
 878        terminal_id: acp::TerminalId,
 879        bytes: Vec<u8>,
 880    },
 881    Resize {
 882        terminal_id: acp::TerminalId,
 883        cols: u16,
 884        rows: u16,
 885    },
 886    Close {
 887        terminal_id: acp::TerminalId,
 888    },
 889}
 890
 891impl AcpThread {
 892    pub fn on_terminal_provider_event(
 893        &mut self,
 894        event: TerminalProviderEvent,
 895        cx: &mut Context<Self>,
 896    ) {
 897        match event {
 898            TerminalProviderEvent::Created {
 899                terminal_id,
 900                label,
 901                cwd,
 902                output_byte_limit,
 903                terminal,
 904            } => {
 905                let entity = self.register_terminal_created(
 906                    terminal_id.clone(),
 907                    label,
 908                    cwd,
 909                    output_byte_limit,
 910                    terminal,
 911                    cx,
 912                );
 913
 914                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 915                    for data in chunks.drain(..) {
 916                        entity.update(cx, |term, cx| {
 917                            term.inner().update(cx, |inner, cx| {
 918                                inner.write_output(&data, cx);
 919                            })
 920                        });
 921                    }
 922                }
 923
 924                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 925                    entity.update(cx, |_term, cx| {
 926                        cx.notify();
 927                    });
 928                }
 929
 930                cx.notify();
 931            }
 932            TerminalProviderEvent::Output { terminal_id, data } => {
 933                if let Some(entity) = self.terminals.get(&terminal_id) {
 934                    entity.update(cx, |term, cx| {
 935                        term.inner().update(cx, |inner, cx| {
 936                            inner.write_output(&data, cx);
 937                        })
 938                    });
 939                } else {
 940                    self.pending_terminal_output
 941                        .entry(terminal_id)
 942                        .or_default()
 943                        .push(data);
 944                }
 945            }
 946            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 947                if let Some(entity) = self.terminals.get(&terminal_id) {
 948                    entity.update(cx, |term, cx| {
 949                        term.inner().update(cx, |inner, cx| {
 950                            inner.breadcrumb_text = title;
 951                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 952                        })
 953                    });
 954                }
 955            }
 956            TerminalProviderEvent::Exit {
 957                terminal_id,
 958                status,
 959            } => {
 960                if let Some(entity) = self.terminals.get(&terminal_id) {
 961                    entity.update(cx, |_term, cx| {
 962                        cx.notify();
 963                    });
 964                } else {
 965                    self.pending_terminal_exit.insert(terminal_id, status);
 966                }
 967            }
 968        }
 969    }
 970}
 971
 972#[derive(PartialEq, Eq, Debug)]
 973pub enum ThreadStatus {
 974    Idle,
 975    Generating,
 976}
 977
 978#[derive(Debug, Clone)]
 979pub enum LoadError {
 980    Unsupported {
 981        command: SharedString,
 982        current_version: SharedString,
 983        minimum_version: SharedString,
 984    },
 985    FailedToInstall(SharedString),
 986    Exited {
 987        status: ExitStatus,
 988    },
 989    Other(SharedString),
 990}
 991
 992impl Display for LoadError {
 993    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 994        match self {
 995            LoadError::Unsupported {
 996                command: path,
 997                current_version,
 998                minimum_version,
 999            } => {
1000                write!(
1001                    f,
1002                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1003                )
1004            }
1005            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1006            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1007            LoadError::Other(msg) => write!(f, "{msg}"),
1008        }
1009    }
1010}
1011
1012impl Error for LoadError {}
1013
1014impl AcpThread {
1015    pub fn new(
1016        title: impl Into<SharedString>,
1017        connection: Rc<dyn AgentConnection>,
1018        project: Entity<Project>,
1019        action_log: Entity<ActionLog>,
1020        session_id: acp::SessionId,
1021        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1022        cx: &mut Context<Self>,
1023    ) -> Self {
1024        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1025        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1026            loop {
1027                let caps = prompt_capabilities_rx.recv().await?;
1028                this.update(cx, |this, cx| {
1029                    this.prompt_capabilities = caps;
1030                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1031                })?;
1032            }
1033        });
1034
1035        Self {
1036            action_log,
1037            shared_buffers: Default::default(),
1038            entries: Default::default(),
1039            plan: Default::default(),
1040            title: title.into(),
1041            project,
1042            send_task: None,
1043            connection,
1044            session_id,
1045            token_usage: None,
1046            prompt_capabilities,
1047            _observe_prompt_capabilities: task,
1048            terminals: HashMap::default(),
1049            pending_terminal_output: HashMap::default(),
1050            pending_terminal_exit: HashMap::default(),
1051        }
1052    }
1053
1054    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1055        self.prompt_capabilities.clone()
1056    }
1057
1058    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1059        &self.connection
1060    }
1061
1062    pub fn action_log(&self) -> &Entity<ActionLog> {
1063        &self.action_log
1064    }
1065
1066    pub fn project(&self) -> &Entity<Project> {
1067        &self.project
1068    }
1069
1070    pub fn title(&self) -> SharedString {
1071        self.title.clone()
1072    }
1073
1074    pub fn entries(&self) -> &[AgentThreadEntry] {
1075        &self.entries
1076    }
1077
1078    pub fn session_id(&self) -> &acp::SessionId {
1079        &self.session_id
1080    }
1081
1082    pub fn status(&self) -> ThreadStatus {
1083        if self.send_task.is_some() {
1084            ThreadStatus::Generating
1085        } else {
1086            ThreadStatus::Idle
1087        }
1088    }
1089
1090    pub fn token_usage(&self) -> Option<&TokenUsage> {
1091        self.token_usage.as_ref()
1092    }
1093
1094    pub fn has_pending_edit_tool_calls(&self) -> bool {
1095        for entry in self.entries.iter().rev() {
1096            match entry {
1097                AgentThreadEntry::UserMessage(_) => return false,
1098                AgentThreadEntry::ToolCall(
1099                    call @ ToolCall {
1100                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1101                        ..
1102                    },
1103                ) if call.diffs().next().is_some() => {
1104                    return true;
1105                }
1106                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1107            }
1108        }
1109
1110        false
1111    }
1112
1113    pub fn used_tools_since_last_user_message(&self) -> bool {
1114        for entry in self.entries.iter().rev() {
1115            match entry {
1116                AgentThreadEntry::UserMessage(..) => return false,
1117                AgentThreadEntry::AssistantMessage(..) => continue,
1118                AgentThreadEntry::ToolCall(..) => return true,
1119            }
1120        }
1121
1122        false
1123    }
1124
1125    pub fn handle_session_update(
1126        &mut self,
1127        update: acp::SessionUpdate,
1128        cx: &mut Context<Self>,
1129    ) -> Result<(), acp::Error> {
1130        match update {
1131            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1132                self.push_user_content_block(None, content, cx);
1133            }
1134            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1135                self.push_assistant_content_block(content, false, cx);
1136            }
1137            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1138                self.push_assistant_content_block(content, true, cx);
1139            }
1140            acp::SessionUpdate::ToolCall(tool_call) => {
1141                self.upsert_tool_call(tool_call, cx)?;
1142            }
1143            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1144                self.update_tool_call(tool_call_update, cx)?;
1145            }
1146            acp::SessionUpdate::Plan(plan) => {
1147                self.update_plan(plan, cx);
1148            }
1149            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1150                available_commands,
1151                ..
1152            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1153            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1154                current_mode_id,
1155                ..
1156            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1157        }
1158        Ok(())
1159    }
1160
1161    pub fn push_user_content_block(
1162        &mut self,
1163        message_id: Option<UserMessageId>,
1164        chunk: acp::ContentBlock,
1165        cx: &mut Context<Self>,
1166    ) {
1167        let language_registry = self.project.read(cx).languages().clone();
1168        let path_style = self.project.read(cx).path_style(cx);
1169        let entries_len = self.entries.len();
1170
1171        if let Some(last_entry) = self.entries.last_mut()
1172            && let AgentThreadEntry::UserMessage(UserMessage {
1173                id,
1174                content,
1175                chunks,
1176                ..
1177            }) = last_entry
1178        {
1179            *id = message_id.or(id.take());
1180            content.append(chunk.clone(), &language_registry, path_style, cx);
1181            chunks.push(chunk);
1182            let idx = entries_len - 1;
1183            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1184        } else {
1185            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1186            self.push_entry(
1187                AgentThreadEntry::UserMessage(UserMessage {
1188                    id: message_id,
1189                    content,
1190                    chunks: vec![chunk],
1191                    checkpoint: None,
1192                }),
1193                cx,
1194            );
1195        }
1196    }
1197
1198    pub fn push_assistant_content_block(
1199        &mut self,
1200        chunk: acp::ContentBlock,
1201        is_thought: bool,
1202        cx: &mut Context<Self>,
1203    ) {
1204        let language_registry = self.project.read(cx).languages().clone();
1205        let path_style = self.project.read(cx).path_style(cx);
1206        let entries_len = self.entries.len();
1207        if let Some(last_entry) = self.entries.last_mut()
1208            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1209        {
1210            let idx = entries_len - 1;
1211            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1212            match (chunks.last_mut(), is_thought) {
1213                (Some(AssistantMessageChunk::Message { block }), false)
1214                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1215                    block.append(chunk, &language_registry, path_style, cx)
1216                }
1217                _ => {
1218                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1219                    if is_thought {
1220                        chunks.push(AssistantMessageChunk::Thought { block })
1221                    } else {
1222                        chunks.push(AssistantMessageChunk::Message { block })
1223                    }
1224                }
1225            }
1226        } else {
1227            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1228            let chunk = if is_thought {
1229                AssistantMessageChunk::Thought { block }
1230            } else {
1231                AssistantMessageChunk::Message { block }
1232            };
1233
1234            self.push_entry(
1235                AgentThreadEntry::AssistantMessage(AssistantMessage {
1236                    chunks: vec![chunk],
1237                }),
1238                cx,
1239            );
1240        }
1241    }
1242
1243    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1244        self.entries.push(entry);
1245        cx.emit(AcpThreadEvent::NewEntry);
1246    }
1247
1248    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1249        self.connection.set_title(&self.session_id, cx).is_some()
1250    }
1251
1252    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1253        if title != self.title {
1254            self.title = title.clone();
1255            cx.emit(AcpThreadEvent::TitleUpdated);
1256            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1257                return set_title.run(title, cx);
1258            }
1259        }
1260        Task::ready(Ok(()))
1261    }
1262
1263    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1264        self.token_usage = usage;
1265        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1266    }
1267
1268    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1269        cx.emit(AcpThreadEvent::Retry(status));
1270    }
1271
1272    pub fn update_tool_call(
1273        &mut self,
1274        update: impl Into<ToolCallUpdate>,
1275        cx: &mut Context<Self>,
1276    ) -> Result<()> {
1277        let update = update.into();
1278        let languages = self.project.read(cx).languages().clone();
1279        let path_style = self.project.read(cx).path_style(cx);
1280
1281        let ix = match self.index_for_tool_call(update.id()) {
1282            Some(ix) => ix,
1283            None => {
1284                // Tool call not found - create a failed tool call entry
1285                let failed_tool_call = ToolCall {
1286                    id: update.id().clone(),
1287                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1288                    kind: acp::ToolKind::Fetch,
1289                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1290                        acp::ContentBlock::Text(acp::TextContent {
1291                            text: "Tool call not found".to_string(),
1292                            annotations: None,
1293                            meta: None,
1294                        }),
1295                        &languages,
1296                        path_style,
1297                        cx,
1298                    ))],
1299                    status: ToolCallStatus::Failed,
1300                    locations: Vec::new(),
1301                    resolved_locations: Vec::new(),
1302                    raw_input: None,
1303                    raw_output: None,
1304                };
1305                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1306                return Ok(());
1307            }
1308        };
1309        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1310            unreachable!()
1311        };
1312
1313        match update {
1314            ToolCallUpdate::UpdateFields(update) => {
1315                let location_updated = update.fields.locations.is_some();
1316                call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1317                if location_updated {
1318                    self.resolve_locations(update.id, cx);
1319                }
1320            }
1321            ToolCallUpdate::UpdateDiff(update) => {
1322                call.content.clear();
1323                call.content.push(ToolCallContent::Diff(update.diff));
1324            }
1325            ToolCallUpdate::UpdateTerminal(update) => {
1326                call.content.clear();
1327                call.content
1328                    .push(ToolCallContent::Terminal(update.terminal));
1329            }
1330        }
1331
1332        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1333
1334        Ok(())
1335    }
1336
1337    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1338    pub fn upsert_tool_call(
1339        &mut self,
1340        tool_call: acp::ToolCall,
1341        cx: &mut Context<Self>,
1342    ) -> Result<(), acp::Error> {
1343        let status = tool_call.status.into();
1344        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1345    }
1346
1347    /// Fails if id does not match an existing entry.
1348    pub fn upsert_tool_call_inner(
1349        &mut self,
1350        update: acp::ToolCallUpdate,
1351        status: ToolCallStatus,
1352        cx: &mut Context<Self>,
1353    ) -> Result<(), acp::Error> {
1354        let language_registry = self.project.read(cx).languages().clone();
1355        let path_style = self.project.read(cx).path_style(cx);
1356        let id = update.id.clone();
1357
1358        let agent = self.connection().telemetry_id();
1359        let session = self.session_id();
1360        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1361            let status = if matches!(status, ToolCallStatus::Completed) {
1362                "completed"
1363            } else {
1364                "failed"
1365            };
1366            telemetry::event!("Agent Tool Call Completed", agent, session, status);
1367        }
1368
1369        if let Some(ix) = self.index_for_tool_call(&id) {
1370            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1371                unreachable!()
1372            };
1373
1374            call.update_fields(
1375                update.fields,
1376                language_registry,
1377                path_style,
1378                &self.terminals,
1379                cx,
1380            )?;
1381            call.status = status;
1382
1383            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1384        } else {
1385            let call = ToolCall::from_acp(
1386                update.try_into()?,
1387                status,
1388                language_registry,
1389                self.project.read(cx).path_style(cx),
1390                &self.terminals,
1391                cx,
1392            )?;
1393            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1394        };
1395
1396        self.resolve_locations(id, cx);
1397        Ok(())
1398    }
1399
1400    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1401        self.entries
1402            .iter()
1403            .enumerate()
1404            .rev()
1405            .find_map(|(index, entry)| {
1406                if let AgentThreadEntry::ToolCall(tool_call) = entry
1407                    && &tool_call.id == id
1408                {
1409                    Some(index)
1410                } else {
1411                    None
1412                }
1413            })
1414    }
1415
1416    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1417        // The tool call we are looking for is typically the last one, or very close to the end.
1418        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1419        self.entries
1420            .iter_mut()
1421            .enumerate()
1422            .rev()
1423            .find_map(|(index, tool_call)| {
1424                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1425                    && &tool_call.id == id
1426                {
1427                    Some((index, tool_call))
1428                } else {
1429                    None
1430                }
1431            })
1432    }
1433
1434    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1435        self.entries
1436            .iter()
1437            .enumerate()
1438            .rev()
1439            .find_map(|(index, tool_call)| {
1440                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1441                    && &tool_call.id == id
1442                {
1443                    Some((index, tool_call))
1444                } else {
1445                    None
1446                }
1447            })
1448    }
1449
1450    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1451        let project = self.project.clone();
1452        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1453            return;
1454        };
1455        let task = tool_call.resolve_locations(project, cx);
1456        cx.spawn(async move |this, cx| {
1457            let resolved_locations = task.await;
1458
1459            this.update(cx, |this, cx| {
1460                let project = this.project.clone();
1461
1462                for location in resolved_locations.iter().flatten() {
1463                    this.shared_buffers
1464                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1465                }
1466                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1467                    return;
1468                };
1469
1470                if let Some(Some(location)) = resolved_locations.last() {
1471                    project.update(cx, |project, cx| {
1472                        let should_ignore = if let Some(agent_location) = project
1473                            .agent_location()
1474                            .filter(|agent_location| agent_location.buffer == location.buffer)
1475                        {
1476                            let snapshot = location.buffer.read(cx).snapshot();
1477                            let old_position = agent_location.position.to_point(&snapshot);
1478                            let new_position = location.position.to_point(&snapshot);
1479
1480                            // ignore this so that when we get updates from the edit tool
1481                            // the position doesn't reset to the startof line
1482                            old_position.row == new_position.row
1483                                && old_position.column > new_position.column
1484                        } else {
1485                            false
1486                        };
1487                        if !should_ignore {
1488                            project.set_agent_location(Some(location.into()), cx);
1489                        }
1490                    });
1491                }
1492
1493                let resolved_locations = resolved_locations
1494                    .iter()
1495                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1496                    .collect::<Vec<_>>();
1497
1498                if tool_call.resolved_locations != resolved_locations {
1499                    tool_call.resolved_locations = resolved_locations;
1500                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1501                }
1502            })
1503        })
1504        .detach();
1505    }
1506
1507    pub fn request_tool_call_authorization(
1508        &mut self,
1509        tool_call: acp::ToolCallUpdate,
1510        options: Vec<acp::PermissionOption>,
1511        respect_always_allow_setting: bool,
1512        cx: &mut Context<Self>,
1513    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1514        let (tx, rx) = oneshot::channel();
1515
1516        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1517            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1518            // some tools would (incorrectly) continue to auto-accept.
1519            if let Some(allow_once_option) = options.iter().find_map(|option| {
1520                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1521                    Some(option.id.clone())
1522                } else {
1523                    None
1524                }
1525            }) {
1526                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1527                return Ok(async {
1528                    acp::RequestPermissionOutcome::Selected {
1529                        option_id: allow_once_option,
1530                    }
1531                }
1532                .boxed());
1533            }
1534        }
1535
1536        let status = ToolCallStatus::WaitingForConfirmation {
1537            options,
1538            respond_tx: tx,
1539        };
1540
1541        self.upsert_tool_call_inner(tool_call, status, cx)?;
1542        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1543
1544        let fut = async {
1545            match rx.await {
1546                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1547                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1548            }
1549        }
1550        .boxed();
1551
1552        Ok(fut)
1553    }
1554
1555    pub fn authorize_tool_call(
1556        &mut self,
1557        id: acp::ToolCallId,
1558        option_id: acp::PermissionOptionId,
1559        option_kind: acp::PermissionOptionKind,
1560        cx: &mut Context<Self>,
1561    ) {
1562        let Some((ix, call)) = self.tool_call_mut(&id) else {
1563            return;
1564        };
1565
1566        let new_status = match option_kind {
1567            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1568                ToolCallStatus::Rejected
1569            }
1570            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1571                ToolCallStatus::InProgress
1572            }
1573        };
1574
1575        let curr_status = mem::replace(&mut call.status, new_status);
1576
1577        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1578            respond_tx.send(option_id).log_err();
1579        } else if cfg!(debug_assertions) {
1580            panic!("tried to authorize an already authorized tool call");
1581        }
1582
1583        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1584    }
1585
1586    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1587        let mut first_tool_call = None;
1588
1589        for entry in self.entries.iter().rev() {
1590            match &entry {
1591                AgentThreadEntry::ToolCall(call) => {
1592                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1593                        first_tool_call = Some(call);
1594                    } else {
1595                        continue;
1596                    }
1597                }
1598                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1599                    // Reached the beginning of the turn.
1600                    // If we had pending permission requests in the previous turn, they have been cancelled.
1601                    break;
1602                }
1603            }
1604        }
1605
1606        first_tool_call
1607    }
1608
1609    pub fn plan(&self) -> &Plan {
1610        &self.plan
1611    }
1612
1613    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1614        let new_entries_len = request.entries.len();
1615        let mut new_entries = request.entries.into_iter();
1616
1617        // Reuse existing markdown to prevent flickering
1618        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1619            let PlanEntry {
1620                content,
1621                priority,
1622                status,
1623            } = old;
1624            content.update(cx, |old, cx| {
1625                old.replace(new.content, cx);
1626            });
1627            *priority = new.priority;
1628            *status = new.status;
1629        }
1630        for new in new_entries {
1631            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1632        }
1633        self.plan.entries.truncate(new_entries_len);
1634
1635        cx.notify();
1636    }
1637
1638    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1639        self.plan
1640            .entries
1641            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1642        cx.notify();
1643    }
1644
1645    #[cfg(any(test, feature = "test-support"))]
1646    pub fn send_raw(
1647        &mut self,
1648        message: &str,
1649        cx: &mut Context<Self>,
1650    ) -> BoxFuture<'static, Result<()>> {
1651        self.send(
1652            vec![acp::ContentBlock::Text(acp::TextContent {
1653                text: message.to_string(),
1654                annotations: None,
1655                meta: None,
1656            })],
1657            cx,
1658        )
1659    }
1660
1661    pub fn send(
1662        &mut self,
1663        message: Vec<acp::ContentBlock>,
1664        cx: &mut Context<Self>,
1665    ) -> BoxFuture<'static, Result<()>> {
1666        let block = ContentBlock::new_combined(
1667            message.clone(),
1668            self.project.read(cx).languages().clone(),
1669            self.project.read(cx).path_style(cx),
1670            cx,
1671        );
1672        let request = acp::PromptRequest {
1673            prompt: message.clone(),
1674            session_id: self.session_id.clone(),
1675            meta: None,
1676        };
1677        let git_store = self.project.read(cx).git_store().clone();
1678
1679        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1680            Some(UserMessageId::new())
1681        } else {
1682            None
1683        };
1684
1685        self.run_turn(cx, async move |this, cx| {
1686            this.update(cx, |this, cx| {
1687                this.push_entry(
1688                    AgentThreadEntry::UserMessage(UserMessage {
1689                        id: message_id.clone(),
1690                        content: block,
1691                        chunks: message,
1692                        checkpoint: None,
1693                    }),
1694                    cx,
1695                );
1696            })
1697            .ok();
1698
1699            let old_checkpoint = git_store
1700                .update(cx, |git, cx| git.checkpoint(cx))?
1701                .await
1702                .context("failed to get old checkpoint")
1703                .log_err();
1704            this.update(cx, |this, cx| {
1705                if let Some((_ix, message)) = this.last_user_message() {
1706                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1707                        git_checkpoint,
1708                        show: false,
1709                    });
1710                }
1711                this.connection.prompt(message_id, request, cx)
1712            })?
1713            .await
1714        })
1715    }
1716
1717    pub fn can_resume(&self, cx: &App) -> bool {
1718        self.connection.resume(&self.session_id, cx).is_some()
1719    }
1720
1721    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1722        self.run_turn(cx, async move |this, cx| {
1723            this.update(cx, |this, cx| {
1724                this.connection
1725                    .resume(&this.session_id, cx)
1726                    .map(|resume| resume.run(cx))
1727            })?
1728            .context("resuming a session is not supported")?
1729            .await
1730        })
1731    }
1732
1733    fn run_turn(
1734        &mut self,
1735        cx: &mut Context<Self>,
1736        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1737    ) -> BoxFuture<'static, Result<()>> {
1738        self.clear_completed_plan_entries(cx);
1739
1740        let (tx, rx) = oneshot::channel();
1741        let cancel_task = self.cancel(cx);
1742
1743        self.send_task = Some(cx.spawn(async move |this, cx| {
1744            cancel_task.await;
1745            tx.send(f(this, cx).await).ok();
1746        }));
1747
1748        cx.spawn(async move |this, cx| {
1749            let response = rx.await;
1750
1751            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1752                .await?;
1753
1754            this.update(cx, |this, cx| {
1755                this.project
1756                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1757                match response {
1758                    Ok(Err(e)) => {
1759                        this.send_task.take();
1760                        cx.emit(AcpThreadEvent::Error);
1761                        Err(e)
1762                    }
1763                    result => {
1764                        let canceled = matches!(
1765                            result,
1766                            Ok(Ok(acp::PromptResponse {
1767                                stop_reason: acp::StopReason::Cancelled,
1768                                meta: None,
1769                            }))
1770                        );
1771
1772                        // We only take the task if the current prompt wasn't canceled.
1773                        //
1774                        // This prompt may have been canceled because another one was sent
1775                        // while it was still generating. In these cases, dropping `send_task`
1776                        // would cause the next generation to be canceled.
1777                        if !canceled {
1778                            this.send_task.take();
1779                        }
1780
1781                        // Handle refusal - distinguish between user prompt and tool call refusals
1782                        if let Ok(Ok(acp::PromptResponse {
1783                            stop_reason: acp::StopReason::Refusal,
1784                            meta: _,
1785                        })) = result
1786                        {
1787                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1788                                // Check if there's a completed tool call with results after the last user message
1789                                // This indicates the refusal is in response to tool output, not the user's prompt
1790                                let has_completed_tool_call_after_user_msg =
1791                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1792                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1793                                            // Check if the tool call has completed and has output
1794                                            matches!(tool_call.status, ToolCallStatus::Completed)
1795                                                && tool_call.raw_output.is_some()
1796                                        } else {
1797                                            false
1798                                        }
1799                                    });
1800
1801                                if has_completed_tool_call_after_user_msg {
1802                                    // Refusal is due to tool output - don't truncate, just notify
1803                                    // The model refused based on what the tool returned
1804                                    cx.emit(AcpThreadEvent::Refusal);
1805                                } else {
1806                                    // User prompt was refused - truncate back to before the user message
1807                                    let range = user_msg_ix..this.entries.len();
1808                                    if range.start < range.end {
1809                                        this.entries.truncate(user_msg_ix);
1810                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1811                                    }
1812                                    cx.emit(AcpThreadEvent::Refusal);
1813                                }
1814                            } else {
1815                                // No user message found, treat as general refusal
1816                                cx.emit(AcpThreadEvent::Refusal);
1817                            }
1818                        }
1819
1820                        cx.emit(AcpThreadEvent::Stopped);
1821                        Ok(())
1822                    }
1823                }
1824            })?
1825        })
1826        .boxed()
1827    }
1828
1829    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1830        let Some(send_task) = self.send_task.take() else {
1831            return Task::ready(());
1832        };
1833
1834        for entry in self.entries.iter_mut() {
1835            if let AgentThreadEntry::ToolCall(call) = entry {
1836                let cancel = matches!(
1837                    call.status,
1838                    ToolCallStatus::Pending
1839                        | ToolCallStatus::WaitingForConfirmation { .. }
1840                        | ToolCallStatus::InProgress
1841                );
1842
1843                if cancel {
1844                    call.status = ToolCallStatus::Canceled;
1845                }
1846            }
1847        }
1848
1849        self.connection.cancel(&self.session_id, cx);
1850
1851        // Wait for the send task to complete
1852        cx.foreground_executor().spawn(send_task)
1853    }
1854
1855    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1856    pub fn restore_checkpoint(
1857        &mut self,
1858        id: UserMessageId,
1859        cx: &mut Context<Self>,
1860    ) -> Task<Result<()>> {
1861        let Some((_, message)) = self.user_message_mut(&id) else {
1862            return Task::ready(Err(anyhow!("message not found")));
1863        };
1864
1865        let checkpoint = message
1866            .checkpoint
1867            .as_ref()
1868            .map(|c| c.git_checkpoint.clone());
1869        let rewind = self.rewind(id.clone(), cx);
1870        let git_store = self.project.read(cx).git_store().clone();
1871
1872        cx.spawn(async move |_, cx| {
1873            rewind.await?;
1874            if let Some(checkpoint) = checkpoint {
1875                git_store
1876                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1877                    .await?;
1878            }
1879
1880            Ok(())
1881        })
1882    }
1883
1884    /// Rewinds this thread to before the entry at `index`, removing it and all
1885    /// subsequent entries while rejecting any action_log changes made from that point.
1886    /// Unlike `restore_checkpoint`, this method does not restore from git.
1887    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1888        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1889            return Task::ready(Err(anyhow!("not supported")));
1890        };
1891
1892        let telemetry = ActionLogTelemetry::from(&*self);
1893        cx.spawn(async move |this, cx| {
1894            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1895            this.update(cx, |this, cx| {
1896                if let Some((ix, _)) = this.user_message_mut(&id) {
1897                    let range = ix..this.entries.len();
1898                    this.entries.truncate(ix);
1899                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1900                }
1901                this.action_log().update(cx, |action_log, cx| {
1902                    action_log.reject_all_edits(Some(telemetry), cx)
1903                })
1904            })?
1905            .await;
1906            Ok(())
1907        })
1908    }
1909
1910    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1911        let git_store = self.project.read(cx).git_store().clone();
1912
1913        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1914            if let Some(checkpoint) = message.checkpoint.as_ref() {
1915                checkpoint.git_checkpoint.clone()
1916            } else {
1917                return Task::ready(Ok(()));
1918            }
1919        } else {
1920            return Task::ready(Ok(()));
1921        };
1922
1923        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1924        cx.spawn(async move |this, cx| {
1925            let new_checkpoint = new_checkpoint
1926                .await
1927                .context("failed to get new checkpoint")
1928                .log_err();
1929            if let Some(new_checkpoint) = new_checkpoint {
1930                let equal = git_store
1931                    .update(cx, |git, cx| {
1932                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1933                    })?
1934                    .await
1935                    .unwrap_or(true);
1936                this.update(cx, |this, cx| {
1937                    let (ix, message) = this.last_user_message().context("no user message")?;
1938                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1939                    checkpoint.show = !equal;
1940                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1941                    anyhow::Ok(())
1942                })??;
1943            }
1944
1945            Ok(())
1946        })
1947    }
1948
1949    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1950        self.entries
1951            .iter_mut()
1952            .enumerate()
1953            .rev()
1954            .find_map(|(ix, entry)| {
1955                if let AgentThreadEntry::UserMessage(message) = entry {
1956                    Some((ix, message))
1957                } else {
1958                    None
1959                }
1960            })
1961    }
1962
1963    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1964        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1965            if let AgentThreadEntry::UserMessage(message) = entry {
1966                if message.id.as_ref() == Some(id) {
1967                    Some((ix, message))
1968                } else {
1969                    None
1970                }
1971            } else {
1972                None
1973            }
1974        })
1975    }
1976
1977    pub fn read_text_file(
1978        &self,
1979        path: PathBuf,
1980        line: Option<u32>,
1981        limit: Option<u32>,
1982        reuse_shared_snapshot: bool,
1983        cx: &mut Context<Self>,
1984    ) -> Task<Result<String, acp::Error>> {
1985        // Args are 1-based, move to 0-based
1986        let line = line.unwrap_or_default().saturating_sub(1);
1987        let limit = limit.unwrap_or(u32::MAX);
1988        let project = self.project.clone();
1989        let action_log = self.action_log.clone();
1990        cx.spawn(async move |this, cx| {
1991            let load = project
1992                .update(cx, |project, cx| {
1993                    let path = project
1994                        .project_path_for_absolute_path(&path, cx)
1995                        .ok_or_else(|| {
1996                            acp::Error::resource_not_found(Some(path.display().to_string()))
1997                        })?;
1998                    Ok(project.open_buffer(path, cx))
1999                })
2000                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
2001                .flatten()?;
2002
2003            let buffer = load.await?;
2004
2005            let snapshot = if reuse_shared_snapshot {
2006                this.read_with(cx, |this, _| {
2007                    this.shared_buffers.get(&buffer.clone()).cloned()
2008                })
2009                .log_err()
2010                .flatten()
2011            } else {
2012                None
2013            };
2014
2015            let snapshot = if let Some(snapshot) = snapshot {
2016                snapshot
2017            } else {
2018                action_log.update(cx, |action_log, cx| {
2019                    action_log.buffer_read(buffer.clone(), cx);
2020                })?;
2021
2022                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2023                this.update(cx, |this, _| {
2024                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2025                })?;
2026                snapshot
2027            };
2028
2029            let max_point = snapshot.max_point();
2030            let start_position = Point::new(line, 0);
2031
2032            if start_position > max_point {
2033                return Err(acp::Error::invalid_params().with_data(format!(
2034                    "Attempting to read beyond the end of the file, line {}:{}",
2035                    max_point.row + 1,
2036                    max_point.column
2037                )));
2038            }
2039
2040            let start = snapshot.anchor_before(start_position);
2041            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2042
2043            project.update(cx, |project, cx| {
2044                project.set_agent_location(
2045                    Some(AgentLocation {
2046                        buffer: buffer.downgrade(),
2047                        position: start,
2048                    }),
2049                    cx,
2050                );
2051            })?;
2052
2053            Ok(snapshot.text_for_range(start..end).collect::<String>())
2054        })
2055    }
2056
2057    pub fn write_text_file(
2058        &self,
2059        path: PathBuf,
2060        content: String,
2061        cx: &mut Context<Self>,
2062    ) -> Task<Result<()>> {
2063        let project = self.project.clone();
2064        let action_log = self.action_log.clone();
2065        cx.spawn(async move |this, cx| {
2066            let load = project.update(cx, |project, cx| {
2067                let path = project
2068                    .project_path_for_absolute_path(&path, cx)
2069                    .context("invalid path")?;
2070                anyhow::Ok(project.open_buffer(path, cx))
2071            });
2072            let buffer = load??.await?;
2073            let snapshot = this.update(cx, |this, cx| {
2074                this.shared_buffers
2075                    .get(&buffer)
2076                    .cloned()
2077                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2078            })?;
2079            let edits = cx
2080                .background_executor()
2081                .spawn(async move {
2082                    let old_text = snapshot.text();
2083                    text_diff(old_text.as_str(), &content)
2084                        .into_iter()
2085                        .map(|(range, replacement)| {
2086                            (
2087                                snapshot.anchor_after(range.start)
2088                                    ..snapshot.anchor_before(range.end),
2089                                replacement,
2090                            )
2091                        })
2092                        .collect::<Vec<_>>()
2093                })
2094                .await;
2095
2096            project.update(cx, |project, cx| {
2097                project.set_agent_location(
2098                    Some(AgentLocation {
2099                        buffer: buffer.downgrade(),
2100                        position: edits
2101                            .last()
2102                            .map(|(range, _)| range.end)
2103                            .unwrap_or(Anchor::MIN),
2104                    }),
2105                    cx,
2106                );
2107            })?;
2108
2109            let format_on_save = cx.update(|cx| {
2110                action_log.update(cx, |action_log, cx| {
2111                    action_log.buffer_read(buffer.clone(), cx);
2112                });
2113
2114                let format_on_save = buffer.update(cx, |buffer, cx| {
2115                    buffer.edit(edits, None, cx);
2116
2117                    let settings = language::language_settings::language_settings(
2118                        buffer.language().map(|l| l.name()),
2119                        buffer.file(),
2120                        cx,
2121                    );
2122
2123                    settings.format_on_save != FormatOnSave::Off
2124                });
2125                action_log.update(cx, |action_log, cx| {
2126                    action_log.buffer_edited(buffer.clone(), cx);
2127                });
2128                format_on_save
2129            })?;
2130
2131            if format_on_save {
2132                let format_task = project.update(cx, |project, cx| {
2133                    project.format(
2134                        HashSet::from_iter([buffer.clone()]),
2135                        LspFormatTarget::Buffers,
2136                        false,
2137                        FormatTrigger::Save,
2138                        cx,
2139                    )
2140                })?;
2141                format_task.await.log_err();
2142
2143                action_log.update(cx, |action_log, cx| {
2144                    action_log.buffer_edited(buffer.clone(), cx);
2145                })?;
2146            }
2147
2148            project
2149                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2150                .await
2151        })
2152    }
2153
2154    pub fn create_terminal(
2155        &self,
2156        command: String,
2157        args: Vec<String>,
2158        extra_env: Vec<acp::EnvVariable>,
2159        cwd: Option<PathBuf>,
2160        output_byte_limit: Option<u64>,
2161        cx: &mut Context<Self>,
2162    ) -> Task<Result<Entity<Terminal>>> {
2163        let env = match &cwd {
2164            Some(dir) => self.project.update(cx, |project, cx| {
2165                project.environment().update(cx, |env, cx| {
2166                    env.directory_environment(dir.as_path().into(), cx)
2167                })
2168            }),
2169            None => Task::ready(None).shared(),
2170        };
2171        let env = cx.spawn(async move |_, _| {
2172            let mut env = env.await.unwrap_or_default();
2173            // Disables paging for `git` and hopefully other commands
2174            env.insert("PAGER".into(), "".into());
2175            for var in extra_env {
2176                env.insert(var.name, var.value);
2177            }
2178            env
2179        });
2180
2181        let project = self.project.clone();
2182        let language_registry = project.read(cx).languages().clone();
2183        let is_windows = project.read(cx).path_style(cx).is_windows();
2184
2185        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2186        let terminal_task = cx.spawn({
2187            let terminal_id = terminal_id.clone();
2188            async move |_this, cx| {
2189                let env = env.await;
2190                let shell = project
2191                    .update(cx, |project, cx| {
2192                        project
2193                            .remote_client()
2194                            .and_then(|r| r.read(cx).default_system_shell())
2195                    })?
2196                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2197                let (task_command, task_args) =
2198                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2199                        .redirect_stdin_to_dev_null()
2200                        .build(Some(command.clone()), &args);
2201                let terminal = project
2202                    .update(cx, |project, cx| {
2203                        project.create_terminal_task(
2204                            task::SpawnInTerminal {
2205                                command: Some(task_command),
2206                                args: task_args,
2207                                cwd: cwd.clone(),
2208                                env,
2209                                ..Default::default()
2210                            },
2211                            cx,
2212                        )
2213                    })?
2214                    .await?;
2215
2216                cx.new(|cx| {
2217                    Terminal::new(
2218                        terminal_id,
2219                        &format!("{} {}", command, args.join(" ")),
2220                        cwd,
2221                        output_byte_limit.map(|l| l as usize),
2222                        terminal,
2223                        language_registry,
2224                        cx,
2225                    )
2226                })
2227            }
2228        });
2229
2230        cx.spawn(async move |this, cx| {
2231            let terminal = terminal_task.await?;
2232            this.update(cx, |this, _cx| {
2233                this.terminals.insert(terminal_id, terminal.clone());
2234                terminal
2235            })
2236        })
2237    }
2238
2239    pub fn kill_terminal(
2240        &mut self,
2241        terminal_id: acp::TerminalId,
2242        cx: &mut Context<Self>,
2243    ) -> Result<()> {
2244        self.terminals
2245            .get(&terminal_id)
2246            .context("Terminal not found")?
2247            .update(cx, |terminal, cx| {
2248                terminal.kill(cx);
2249            });
2250
2251        Ok(())
2252    }
2253
2254    pub fn release_terminal(
2255        &mut self,
2256        terminal_id: acp::TerminalId,
2257        cx: &mut Context<Self>,
2258    ) -> Result<()> {
2259        self.terminals
2260            .remove(&terminal_id)
2261            .context("Terminal not found")?
2262            .update(cx, |terminal, cx| {
2263                terminal.kill(cx);
2264            });
2265
2266        Ok(())
2267    }
2268
2269    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2270        self.terminals
2271            .get(&terminal_id)
2272            .context("Terminal not found")
2273            .cloned()
2274    }
2275
2276    pub fn to_markdown(&self, cx: &App) -> String {
2277        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2278    }
2279
2280    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2281        cx.emit(AcpThreadEvent::LoadError(error));
2282    }
2283
2284    pub fn register_terminal_created(
2285        &mut self,
2286        terminal_id: acp::TerminalId,
2287        command_label: String,
2288        working_dir: Option<PathBuf>,
2289        output_byte_limit: Option<u64>,
2290        terminal: Entity<::terminal::Terminal>,
2291        cx: &mut Context<Self>,
2292    ) -> Entity<Terminal> {
2293        let language_registry = self.project.read(cx).languages().clone();
2294
2295        let entity = cx.new(|cx| {
2296            Terminal::new(
2297                terminal_id.clone(),
2298                &command_label,
2299                working_dir.clone(),
2300                output_byte_limit.map(|l| l as usize),
2301                terminal,
2302                language_registry,
2303                cx,
2304            )
2305        });
2306        self.terminals.insert(terminal_id.clone(), entity.clone());
2307        entity
2308    }
2309}
2310
2311fn markdown_for_raw_output(
2312    raw_output: &serde_json::Value,
2313    language_registry: &Arc<LanguageRegistry>,
2314    cx: &mut App,
2315) -> Option<Entity<Markdown>> {
2316    match raw_output {
2317        serde_json::Value::Null => None,
2318        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2319            Markdown::new(
2320                value.to_string().into(),
2321                Some(language_registry.clone()),
2322                None,
2323                cx,
2324            )
2325        })),
2326        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2327            Markdown::new(
2328                value.to_string().into(),
2329                Some(language_registry.clone()),
2330                None,
2331                cx,
2332            )
2333        })),
2334        serde_json::Value::String(value) => Some(cx.new(|cx| {
2335            Markdown::new(
2336                value.clone().into(),
2337                Some(language_registry.clone()),
2338                None,
2339                cx,
2340            )
2341        })),
2342        value => Some(cx.new(|cx| {
2343            Markdown::new(
2344                format!("```json\n{}\n```", value).into(),
2345                Some(language_registry.clone()),
2346                None,
2347                cx,
2348            )
2349        })),
2350    }
2351}
2352
2353#[cfg(test)]
2354mod tests {
2355    use super::*;
2356    use anyhow::anyhow;
2357    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2358    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2359    use indoc::indoc;
2360    use project::{FakeFs, Fs};
2361    use rand::{distr, prelude::*};
2362    use serde_json::json;
2363    use settings::SettingsStore;
2364    use smol::stream::StreamExt as _;
2365    use std::{
2366        any::Any,
2367        cell::RefCell,
2368        path::Path,
2369        rc::Rc,
2370        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2371        time::Duration,
2372    };
2373    use util::path;
2374
2375    fn init_test(cx: &mut TestAppContext) {
2376        env_logger::try_init().ok();
2377        cx.update(|cx| {
2378            let settings_store = SettingsStore::test(cx);
2379            cx.set_global(settings_store);
2380        });
2381    }
2382
2383    #[gpui::test]
2384    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2385        init_test(cx);
2386
2387        let fs = FakeFs::new(cx.executor());
2388        let project = Project::test(fs, [], cx).await;
2389        let connection = Rc::new(FakeAgentConnection::new());
2390        let thread = cx
2391            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2392            .await
2393            .unwrap();
2394
2395        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2396
2397        // Send Output BEFORE Created - should be buffered by acp_thread
2398        thread.update(cx, |thread, cx| {
2399            thread.on_terminal_provider_event(
2400                TerminalProviderEvent::Output {
2401                    terminal_id: terminal_id.clone(),
2402                    data: b"hello buffered".to_vec(),
2403                },
2404                cx,
2405            );
2406        });
2407
2408        // Create a display-only terminal and then send Created
2409        let lower = cx.new(|cx| {
2410            let builder = ::terminal::TerminalBuilder::new_display_only(
2411                ::terminal::terminal_settings::CursorShape::default(),
2412                ::terminal::terminal_settings::AlternateScroll::On,
2413                None,
2414                0,
2415            )
2416            .unwrap();
2417            builder.subscribe(cx)
2418        });
2419
2420        thread.update(cx, |thread, cx| {
2421            thread.on_terminal_provider_event(
2422                TerminalProviderEvent::Created {
2423                    terminal_id: terminal_id.clone(),
2424                    label: "Buffered Test".to_string(),
2425                    cwd: None,
2426                    output_byte_limit: None,
2427                    terminal: lower.clone(),
2428                },
2429                cx,
2430            );
2431        });
2432
2433        // After Created, buffered Output should have been flushed into the renderer
2434        let content = thread.read_with(cx, |thread, cx| {
2435            let term = thread.terminal(terminal_id.clone()).unwrap();
2436            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2437        });
2438
2439        assert!(
2440            content.contains("hello buffered"),
2441            "expected buffered output to render, got: {content}"
2442        );
2443    }
2444
2445    #[gpui::test]
2446    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2447        init_test(cx);
2448
2449        let fs = FakeFs::new(cx.executor());
2450        let project = Project::test(fs, [], cx).await;
2451        let connection = Rc::new(FakeAgentConnection::new());
2452        let thread = cx
2453            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2454            .await
2455            .unwrap();
2456
2457        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2458
2459        // Send Output BEFORE Created
2460        thread.update(cx, |thread, cx| {
2461            thread.on_terminal_provider_event(
2462                TerminalProviderEvent::Output {
2463                    terminal_id: terminal_id.clone(),
2464                    data: b"pre-exit data".to_vec(),
2465                },
2466                cx,
2467            );
2468        });
2469
2470        // Send Exit BEFORE Created
2471        thread.update(cx, |thread, cx| {
2472            thread.on_terminal_provider_event(
2473                TerminalProviderEvent::Exit {
2474                    terminal_id: terminal_id.clone(),
2475                    status: acp::TerminalExitStatus {
2476                        exit_code: Some(0),
2477                        signal: None,
2478                        meta: None,
2479                    },
2480                },
2481                cx,
2482            );
2483        });
2484
2485        // Now create a display-only lower-level terminal and send Created
2486        let lower = cx.new(|cx| {
2487            let builder = ::terminal::TerminalBuilder::new_display_only(
2488                ::terminal::terminal_settings::CursorShape::default(),
2489                ::terminal::terminal_settings::AlternateScroll::On,
2490                None,
2491                0,
2492            )
2493            .unwrap();
2494            builder.subscribe(cx)
2495        });
2496
2497        thread.update(cx, |thread, cx| {
2498            thread.on_terminal_provider_event(
2499                TerminalProviderEvent::Created {
2500                    terminal_id: terminal_id.clone(),
2501                    label: "Buffered Exit Test".to_string(),
2502                    cwd: None,
2503                    output_byte_limit: None,
2504                    terminal: lower.clone(),
2505                },
2506                cx,
2507            );
2508        });
2509
2510        // Output should be present after Created (flushed from buffer)
2511        let content = thread.read_with(cx, |thread, cx| {
2512            let term = thread.terminal(terminal_id.clone()).unwrap();
2513            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2514        });
2515
2516        assert!(
2517            content.contains("pre-exit data"),
2518            "expected pre-exit data to render, got: {content}"
2519        );
2520    }
2521
2522    #[gpui::test]
2523    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2524        init_test(cx);
2525
2526        let fs = FakeFs::new(cx.executor());
2527        let project = Project::test(fs, [], cx).await;
2528        let connection = Rc::new(FakeAgentConnection::new());
2529        let thread = cx
2530            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2531            .await
2532            .unwrap();
2533
2534        // Test creating a new user message
2535        thread.update(cx, |thread, cx| {
2536            thread.push_user_content_block(
2537                None,
2538                acp::ContentBlock::Text(acp::TextContent {
2539                    annotations: None,
2540                    text: "Hello, ".to_string(),
2541                    meta: None,
2542                }),
2543                cx,
2544            );
2545        });
2546
2547        thread.update(cx, |thread, cx| {
2548            assert_eq!(thread.entries.len(), 1);
2549            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2550                assert_eq!(user_msg.id, None);
2551                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2552            } else {
2553                panic!("Expected UserMessage");
2554            }
2555        });
2556
2557        // Test appending to existing user message
2558        let message_1_id = UserMessageId::new();
2559        thread.update(cx, |thread, cx| {
2560            thread.push_user_content_block(
2561                Some(message_1_id.clone()),
2562                acp::ContentBlock::Text(acp::TextContent {
2563                    annotations: None,
2564                    text: "world!".to_string(),
2565                    meta: None,
2566                }),
2567                cx,
2568            );
2569        });
2570
2571        thread.update(cx, |thread, cx| {
2572            assert_eq!(thread.entries.len(), 1);
2573            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2574                assert_eq!(user_msg.id, Some(message_1_id));
2575                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2576            } else {
2577                panic!("Expected UserMessage");
2578            }
2579        });
2580
2581        // Test creating new user message after assistant message
2582        thread.update(cx, |thread, cx| {
2583            thread.push_assistant_content_block(
2584                acp::ContentBlock::Text(acp::TextContent {
2585                    annotations: None,
2586                    text: "Assistant response".to_string(),
2587                    meta: None,
2588                }),
2589                false,
2590                cx,
2591            );
2592        });
2593
2594        let message_2_id = UserMessageId::new();
2595        thread.update(cx, |thread, cx| {
2596            thread.push_user_content_block(
2597                Some(message_2_id.clone()),
2598                acp::ContentBlock::Text(acp::TextContent {
2599                    annotations: None,
2600                    text: "New user message".to_string(),
2601                    meta: None,
2602                }),
2603                cx,
2604            );
2605        });
2606
2607        thread.update(cx, |thread, cx| {
2608            assert_eq!(thread.entries.len(), 3);
2609            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2610                assert_eq!(user_msg.id, Some(message_2_id));
2611                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2612            } else {
2613                panic!("Expected UserMessage at index 2");
2614            }
2615        });
2616    }
2617
2618    #[gpui::test]
2619    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2620        init_test(cx);
2621
2622        let fs = FakeFs::new(cx.executor());
2623        let project = Project::test(fs, [], cx).await;
2624        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2625            |_, thread, mut cx| {
2626                async move {
2627                    thread.update(&mut cx, |thread, cx| {
2628                        thread
2629                            .handle_session_update(
2630                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2631                                    content: "Thinking ".into(),
2632                                    meta: None,
2633                                }),
2634                                cx,
2635                            )
2636                            .unwrap();
2637                        thread
2638                            .handle_session_update(
2639                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2640                                    content: "hard!".into(),
2641                                    meta: None,
2642                                }),
2643                                cx,
2644                            )
2645                            .unwrap();
2646                    })?;
2647                    Ok(acp::PromptResponse {
2648                        stop_reason: acp::StopReason::EndTurn,
2649                        meta: None,
2650                    })
2651                }
2652                .boxed_local()
2653            },
2654        ));
2655
2656        let thread = cx
2657            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2658            .await
2659            .unwrap();
2660
2661        thread
2662            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2663            .await
2664            .unwrap();
2665
2666        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2667        assert_eq!(
2668            output,
2669            indoc! {r#"
2670            ## User
2671
2672            Hello from Zed!
2673
2674            ## Assistant
2675
2676            <thinking>
2677            Thinking hard!
2678            </thinking>
2679
2680            "#}
2681        );
2682    }
2683
2684    #[gpui::test]
2685    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2686        init_test(cx);
2687
2688        let fs = FakeFs::new(cx.executor());
2689        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2690            .await;
2691        let project = Project::test(fs.clone(), [], cx).await;
2692        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2693        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2694        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2695            move |_, thread, mut cx| {
2696                let read_file_tx = read_file_tx.clone();
2697                async move {
2698                    let content = thread
2699                        .update(&mut cx, |thread, cx| {
2700                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2701                        })
2702                        .unwrap()
2703                        .await
2704                        .unwrap();
2705                    assert_eq!(content, "one\ntwo\nthree\n");
2706                    read_file_tx.take().unwrap().send(()).unwrap();
2707                    thread
2708                        .update(&mut cx, |thread, cx| {
2709                            thread.write_text_file(
2710                                path!("/tmp/foo").into(),
2711                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2712                                cx,
2713                            )
2714                        })
2715                        .unwrap()
2716                        .await
2717                        .unwrap();
2718                    Ok(acp::PromptResponse {
2719                        stop_reason: acp::StopReason::EndTurn,
2720                        meta: None,
2721                    })
2722                }
2723                .boxed_local()
2724            },
2725        ));
2726
2727        let (worktree, pathbuf) = project
2728            .update(cx, |project, cx| {
2729                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2730            })
2731            .await
2732            .unwrap();
2733        let buffer = project
2734            .update(cx, |project, cx| {
2735                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2736            })
2737            .await
2738            .unwrap();
2739
2740        let thread = cx
2741            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2742            .await
2743            .unwrap();
2744
2745        let request = thread.update(cx, |thread, cx| {
2746            thread.send_raw("Extend the count in /tmp/foo", cx)
2747        });
2748        read_file_rx.await.ok();
2749        buffer.update(cx, |buffer, cx| {
2750            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2751        });
2752        cx.run_until_parked();
2753        assert_eq!(
2754            buffer.read_with(cx, |buffer, _| buffer.text()),
2755            "zero\none\ntwo\nthree\nfour\nfive\n"
2756        );
2757        assert_eq!(
2758            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2759            "zero\none\ntwo\nthree\nfour\nfive\n"
2760        );
2761        request.await.unwrap();
2762    }
2763
2764    #[gpui::test]
2765    async fn test_reading_from_line(cx: &mut TestAppContext) {
2766        init_test(cx);
2767
2768        let fs = FakeFs::new(cx.executor());
2769        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2770            .await;
2771        let project = Project::test(fs.clone(), [], cx).await;
2772        project
2773            .update(cx, |project, cx| {
2774                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2775            })
2776            .await
2777            .unwrap();
2778
2779        let connection = Rc::new(FakeAgentConnection::new());
2780
2781        let thread = cx
2782            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2783            .await
2784            .unwrap();
2785
2786        // Whole file
2787        let content = thread
2788            .update(cx, |thread, cx| {
2789                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2790            })
2791            .await
2792            .unwrap();
2793
2794        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2795
2796        // Only start line
2797        let content = thread
2798            .update(cx, |thread, cx| {
2799                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2800            })
2801            .await
2802            .unwrap();
2803
2804        assert_eq!(content, "three\nfour\n");
2805
2806        // Only limit
2807        let content = thread
2808            .update(cx, |thread, cx| {
2809                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2810            })
2811            .await
2812            .unwrap();
2813
2814        assert_eq!(content, "one\ntwo\n");
2815
2816        // Range
2817        let content = thread
2818            .update(cx, |thread, cx| {
2819                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2820            })
2821            .await
2822            .unwrap();
2823
2824        assert_eq!(content, "two\nthree\n");
2825
2826        // Invalid
2827        let err = thread
2828            .update(cx, |thread, cx| {
2829                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2830            })
2831            .await
2832            .unwrap_err();
2833
2834        assert_eq!(
2835            err.to_string(),
2836            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2837        );
2838    }
2839
2840    #[gpui::test]
2841    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2842        init_test(cx);
2843
2844        let fs = FakeFs::new(cx.executor());
2845        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2846        let project = Project::test(fs.clone(), [], cx).await;
2847        project
2848            .update(cx, |project, cx| {
2849                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2850            })
2851            .await
2852            .unwrap();
2853
2854        let connection = Rc::new(FakeAgentConnection::new());
2855
2856        let thread = cx
2857            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2858            .await
2859            .unwrap();
2860
2861        // Whole file
2862        let content = thread
2863            .update(cx, |thread, cx| {
2864                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2865            })
2866            .await
2867            .unwrap();
2868
2869        assert_eq!(content, "");
2870
2871        // Only start line
2872        let content = thread
2873            .update(cx, |thread, cx| {
2874                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2875            })
2876            .await
2877            .unwrap();
2878
2879        assert_eq!(content, "");
2880
2881        // Only limit
2882        let content = thread
2883            .update(cx, |thread, cx| {
2884                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2885            })
2886            .await
2887            .unwrap();
2888
2889        assert_eq!(content, "");
2890
2891        // Range
2892        let content = thread
2893            .update(cx, |thread, cx| {
2894                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2895            })
2896            .await
2897            .unwrap();
2898
2899        assert_eq!(content, "");
2900
2901        // Invalid
2902        let err = thread
2903            .update(cx, |thread, cx| {
2904                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2905            })
2906            .await
2907            .unwrap_err();
2908
2909        assert_eq!(
2910            err.to_string(),
2911            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2912        );
2913    }
2914    #[gpui::test]
2915    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2916        init_test(cx);
2917
2918        let fs = FakeFs::new(cx.executor());
2919        fs.insert_tree(path!("/tmp"), json!({})).await;
2920        let project = Project::test(fs.clone(), [], cx).await;
2921        project
2922            .update(cx, |project, cx| {
2923                project.find_or_create_worktree(path!("/tmp"), true, cx)
2924            })
2925            .await
2926            .unwrap();
2927
2928        let connection = Rc::new(FakeAgentConnection::new());
2929
2930        let thread = cx
2931            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2932            .await
2933            .unwrap();
2934
2935        // Out of project file
2936        let err = thread
2937            .update(cx, |thread, cx| {
2938                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2939            })
2940            .await
2941            .unwrap_err();
2942
2943        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2944    }
2945
2946    #[gpui::test]
2947    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2948        init_test(cx);
2949
2950        let fs = FakeFs::new(cx.executor());
2951        let project = Project::test(fs, [], cx).await;
2952        let id = acp::ToolCallId("test".into());
2953
2954        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2955            let id = id.clone();
2956            move |_, thread, mut cx| {
2957                let id = id.clone();
2958                async move {
2959                    thread
2960                        .update(&mut cx, |thread, cx| {
2961                            thread.handle_session_update(
2962                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2963                                    id: id.clone(),
2964                                    title: "Label".into(),
2965                                    kind: acp::ToolKind::Fetch,
2966                                    status: acp::ToolCallStatus::InProgress,
2967                                    content: vec![],
2968                                    locations: vec![],
2969                                    raw_input: None,
2970                                    raw_output: None,
2971                                    meta: None,
2972                                }),
2973                                cx,
2974                            )
2975                        })
2976                        .unwrap()
2977                        .unwrap();
2978                    Ok(acp::PromptResponse {
2979                        stop_reason: acp::StopReason::EndTurn,
2980                        meta: None,
2981                    })
2982                }
2983                .boxed_local()
2984            }
2985        }));
2986
2987        let thread = cx
2988            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2989            .await
2990            .unwrap();
2991
2992        let request = thread.update(cx, |thread, cx| {
2993            thread.send_raw("Fetch https://example.com", cx)
2994        });
2995
2996        run_until_first_tool_call(&thread, cx).await;
2997
2998        thread.read_with(cx, |thread, _| {
2999            assert!(matches!(
3000                thread.entries[1],
3001                AgentThreadEntry::ToolCall(ToolCall {
3002                    status: ToolCallStatus::InProgress,
3003                    ..
3004                })
3005            ));
3006        });
3007
3008        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3009
3010        thread.read_with(cx, |thread, _| {
3011            assert!(matches!(
3012                &thread.entries[1],
3013                AgentThreadEntry::ToolCall(ToolCall {
3014                    status: ToolCallStatus::Canceled,
3015                    ..
3016                })
3017            ));
3018        });
3019
3020        thread
3021            .update(cx, |thread, cx| {
3022                thread.handle_session_update(
3023                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3024                        id,
3025                        fields: acp::ToolCallUpdateFields {
3026                            status: Some(acp::ToolCallStatus::Completed),
3027                            ..Default::default()
3028                        },
3029                        meta: None,
3030                    }),
3031                    cx,
3032                )
3033            })
3034            .unwrap();
3035
3036        request.await.unwrap();
3037
3038        thread.read_with(cx, |thread, _| {
3039            assert!(matches!(
3040                thread.entries[1],
3041                AgentThreadEntry::ToolCall(ToolCall {
3042                    status: ToolCallStatus::Completed,
3043                    ..
3044                })
3045            ));
3046        });
3047    }
3048
3049    #[gpui::test]
3050    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3051        init_test(cx);
3052        let fs = FakeFs::new(cx.background_executor.clone());
3053        fs.insert_tree(path!("/test"), json!({})).await;
3054        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3055
3056        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3057            move |_, thread, mut cx| {
3058                async move {
3059                    thread
3060                        .update(&mut cx, |thread, cx| {
3061                            thread.handle_session_update(
3062                                acp::SessionUpdate::ToolCall(acp::ToolCall {
3063                                    id: acp::ToolCallId("test".into()),
3064                                    title: "Label".into(),
3065                                    kind: acp::ToolKind::Edit,
3066                                    status: acp::ToolCallStatus::Completed,
3067                                    content: vec![acp::ToolCallContent::Diff {
3068                                        diff: acp::Diff {
3069                                            path: "/test/test.txt".into(),
3070                                            old_text: None,
3071                                            new_text: "foo".into(),
3072                                            meta: None,
3073                                        },
3074                                    }],
3075                                    locations: vec![],
3076                                    raw_input: None,
3077                                    raw_output: None,
3078                                    meta: None,
3079                                }),
3080                                cx,
3081                            )
3082                        })
3083                        .unwrap()
3084                        .unwrap();
3085                    Ok(acp::PromptResponse {
3086                        stop_reason: acp::StopReason::EndTurn,
3087                        meta: None,
3088                    })
3089                }
3090                .boxed_local()
3091            }
3092        }));
3093
3094        let thread = cx
3095            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3096            .await
3097            .unwrap();
3098
3099        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3100            .await
3101            .unwrap();
3102
3103        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3104    }
3105
3106    #[gpui::test(iterations = 10)]
3107    async fn test_checkpoints(cx: &mut TestAppContext) {
3108        init_test(cx);
3109        let fs = FakeFs::new(cx.background_executor.clone());
3110        fs.insert_tree(
3111            path!("/test"),
3112            json!({
3113                ".git": {}
3114            }),
3115        )
3116        .await;
3117        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3118
3119        let simulate_changes = Arc::new(AtomicBool::new(true));
3120        let next_filename = Arc::new(AtomicUsize::new(0));
3121        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3122            let simulate_changes = simulate_changes.clone();
3123            let next_filename = next_filename.clone();
3124            let fs = fs.clone();
3125            move |request, thread, mut cx| {
3126                let fs = fs.clone();
3127                let simulate_changes = simulate_changes.clone();
3128                let next_filename = next_filename.clone();
3129                async move {
3130                    if simulate_changes.load(SeqCst) {
3131                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3132                        fs.write(Path::new(&filename), b"").await?;
3133                    }
3134
3135                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3136                        panic!("expected text content block");
3137                    };
3138                    thread.update(&mut cx, |thread, cx| {
3139                        thread
3140                            .handle_session_update(
3141                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3142                                    content: content.text.to_uppercase().into(),
3143                                    meta: None,
3144                                }),
3145                                cx,
3146                            )
3147                            .unwrap();
3148                    })?;
3149                    Ok(acp::PromptResponse {
3150                        stop_reason: acp::StopReason::EndTurn,
3151                        meta: None,
3152                    })
3153                }
3154                .boxed_local()
3155            }
3156        }));
3157        let thread = cx
3158            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3159            .await
3160            .unwrap();
3161
3162        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3163            .await
3164            .unwrap();
3165        thread.read_with(cx, |thread, cx| {
3166            assert_eq!(
3167                thread.to_markdown(cx),
3168                indoc! {"
3169                    ## User (checkpoint)
3170
3171                    Lorem
3172
3173                    ## Assistant
3174
3175                    LOREM
3176
3177                "}
3178            );
3179        });
3180        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3181
3182        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3183            .await
3184            .unwrap();
3185        thread.read_with(cx, |thread, cx| {
3186            assert_eq!(
3187                thread.to_markdown(cx),
3188                indoc! {"
3189                    ## User (checkpoint)
3190
3191                    Lorem
3192
3193                    ## Assistant
3194
3195                    LOREM
3196
3197                    ## User (checkpoint)
3198
3199                    ipsum
3200
3201                    ## Assistant
3202
3203                    IPSUM
3204
3205                "}
3206            );
3207        });
3208        assert_eq!(
3209            fs.files(),
3210            vec![
3211                Path::new(path!("/test/file-0")),
3212                Path::new(path!("/test/file-1"))
3213            ]
3214        );
3215
3216        // Checkpoint isn't stored when there are no changes.
3217        simulate_changes.store(false, SeqCst);
3218        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3219            .await
3220            .unwrap();
3221        thread.read_with(cx, |thread, cx| {
3222            assert_eq!(
3223                thread.to_markdown(cx),
3224                indoc! {"
3225                    ## User (checkpoint)
3226
3227                    Lorem
3228
3229                    ## Assistant
3230
3231                    LOREM
3232
3233                    ## User (checkpoint)
3234
3235                    ipsum
3236
3237                    ## Assistant
3238
3239                    IPSUM
3240
3241                    ## User
3242
3243                    dolor
3244
3245                    ## Assistant
3246
3247                    DOLOR
3248
3249                "}
3250            );
3251        });
3252        assert_eq!(
3253            fs.files(),
3254            vec![
3255                Path::new(path!("/test/file-0")),
3256                Path::new(path!("/test/file-1"))
3257            ]
3258        );
3259
3260        // Rewinding the conversation truncates the history and restores the checkpoint.
3261        thread
3262            .update(cx, |thread, cx| {
3263                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3264                    panic!("unexpected entries {:?}", thread.entries)
3265                };
3266                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3267            })
3268            .await
3269            .unwrap();
3270        thread.read_with(cx, |thread, cx| {
3271            assert_eq!(
3272                thread.to_markdown(cx),
3273                indoc! {"
3274                    ## User (checkpoint)
3275
3276                    Lorem
3277
3278                    ## Assistant
3279
3280                    LOREM
3281
3282                "}
3283            );
3284        });
3285        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3286    }
3287
3288    #[gpui::test]
3289    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3290        use std::sync::atomic::AtomicUsize;
3291        init_test(cx);
3292
3293        let fs = FakeFs::new(cx.executor());
3294        let project = Project::test(fs, None, cx).await;
3295
3296        // Create a connection that simulates refusal after tool result
3297        let prompt_count = Arc::new(AtomicUsize::new(0));
3298        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3299            let prompt_count = prompt_count.clone();
3300            move |_request, thread, mut cx| {
3301                let count = prompt_count.fetch_add(1, SeqCst);
3302                async move {
3303                    if count == 0 {
3304                        // First prompt: Generate a tool call with result
3305                        thread.update(&mut cx, |thread, cx| {
3306                            thread
3307                                .handle_session_update(
3308                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3309                                        id: acp::ToolCallId("tool1".into()),
3310                                        title: "Test Tool".into(),
3311                                        kind: acp::ToolKind::Fetch,
3312                                        status: acp::ToolCallStatus::Completed,
3313                                        content: vec![],
3314                                        locations: vec![],
3315                                        raw_input: Some(serde_json::json!({"query": "test"})),
3316                                        raw_output: Some(
3317                                            serde_json::json!({"result": "inappropriate content"}),
3318                                        ),
3319                                        meta: None,
3320                                    }),
3321                                    cx,
3322                                )
3323                                .unwrap();
3324                        })?;
3325
3326                        // Now return refusal because of the tool result
3327                        Ok(acp::PromptResponse {
3328                            stop_reason: acp::StopReason::Refusal,
3329                            meta: None,
3330                        })
3331                    } else {
3332                        Ok(acp::PromptResponse {
3333                            stop_reason: acp::StopReason::EndTurn,
3334                            meta: None,
3335                        })
3336                    }
3337                }
3338                .boxed_local()
3339            }
3340        }));
3341
3342        let thread = cx
3343            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3344            .await
3345            .unwrap();
3346
3347        // Track if we see a Refusal event
3348        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3349        let saw_refusal_event_captured = saw_refusal_event.clone();
3350        thread.update(cx, |_thread, cx| {
3351            cx.subscribe(
3352                &thread,
3353                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3354                    if matches!(event, AcpThreadEvent::Refusal) {
3355                        *saw_refusal_event_captured.lock().unwrap() = true;
3356                    }
3357                },
3358            )
3359            .detach();
3360        });
3361
3362        // Send a user message - this will trigger tool call and then refusal
3363        let send_task = thread.update(cx, |thread, cx| {
3364            thread.send(
3365                vec![acp::ContentBlock::Text(acp::TextContent {
3366                    text: "Hello".into(),
3367                    annotations: None,
3368                    meta: None,
3369                })],
3370                cx,
3371            )
3372        });
3373        cx.background_executor.spawn(send_task).detach();
3374        cx.run_until_parked();
3375
3376        // Verify that:
3377        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3378        // 2. The user message was NOT truncated
3379        assert!(
3380            *saw_refusal_event.lock().unwrap(),
3381            "Refusal event should be emitted for tool result refusals"
3382        );
3383
3384        thread.read_with(cx, |thread, _| {
3385            let entries = thread.entries();
3386            assert!(entries.len() >= 2, "Should have user message and tool call");
3387
3388            // Verify user message is still there
3389            assert!(
3390                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3391                "User message should not be truncated"
3392            );
3393
3394            // Verify tool call is there with result
3395            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3396                assert!(
3397                    tool_call.raw_output.is_some(),
3398                    "Tool call should have output"
3399                );
3400            } else {
3401                panic!("Expected tool call at index 1");
3402            }
3403        });
3404    }
3405
3406    #[gpui::test]
3407    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3408        init_test(cx);
3409
3410        let fs = FakeFs::new(cx.executor());
3411        let project = Project::test(fs, None, cx).await;
3412
3413        let refuse_next = Arc::new(AtomicBool::new(false));
3414        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3415            let refuse_next = refuse_next.clone();
3416            move |_request, _thread, _cx| {
3417                if refuse_next.load(SeqCst) {
3418                    async move {
3419                        Ok(acp::PromptResponse {
3420                            stop_reason: acp::StopReason::Refusal,
3421                            meta: None,
3422                        })
3423                    }
3424                    .boxed_local()
3425                } else {
3426                    async move {
3427                        Ok(acp::PromptResponse {
3428                            stop_reason: acp::StopReason::EndTurn,
3429                            meta: None,
3430                        })
3431                    }
3432                    .boxed_local()
3433                }
3434            }
3435        }));
3436
3437        let thread = cx
3438            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3439            .await
3440            .unwrap();
3441
3442        // Track if we see a Refusal event
3443        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3444        let saw_refusal_event_captured = saw_refusal_event.clone();
3445        thread.update(cx, |_thread, cx| {
3446            cx.subscribe(
3447                &thread,
3448                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3449                    if matches!(event, AcpThreadEvent::Refusal) {
3450                        *saw_refusal_event_captured.lock().unwrap() = true;
3451                    }
3452                },
3453            )
3454            .detach();
3455        });
3456
3457        // Send a message that will be refused
3458        refuse_next.store(true, SeqCst);
3459        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3460            .await
3461            .unwrap();
3462
3463        // Verify that a Refusal event WAS emitted for user prompt refusal
3464        assert!(
3465            *saw_refusal_event.lock().unwrap(),
3466            "Refusal event should be emitted for user prompt refusals"
3467        );
3468
3469        // Verify the message was truncated (user prompt refusal)
3470        thread.read_with(cx, |thread, cx| {
3471            assert_eq!(thread.to_markdown(cx), "");
3472        });
3473    }
3474
3475    #[gpui::test]
3476    async fn test_refusal(cx: &mut TestAppContext) {
3477        init_test(cx);
3478        let fs = FakeFs::new(cx.background_executor.clone());
3479        fs.insert_tree(path!("/"), json!({})).await;
3480        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3481
3482        let refuse_next = Arc::new(AtomicBool::new(false));
3483        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3484            let refuse_next = refuse_next.clone();
3485            move |request, thread, mut cx| {
3486                let refuse_next = refuse_next.clone();
3487                async move {
3488                    if refuse_next.load(SeqCst) {
3489                        return Ok(acp::PromptResponse {
3490                            stop_reason: acp::StopReason::Refusal,
3491                            meta: None,
3492                        });
3493                    }
3494
3495                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3496                        panic!("expected text content block");
3497                    };
3498                    thread.update(&mut cx, |thread, cx| {
3499                        thread
3500                            .handle_session_update(
3501                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3502                                    content: content.text.to_uppercase().into(),
3503                                    meta: None,
3504                                }),
3505                                cx,
3506                            )
3507                            .unwrap();
3508                    })?;
3509                    Ok(acp::PromptResponse {
3510                        stop_reason: acp::StopReason::EndTurn,
3511                        meta: None,
3512                    })
3513                }
3514                .boxed_local()
3515            }
3516        }));
3517        let thread = cx
3518            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3519            .await
3520            .unwrap();
3521
3522        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3523            .await
3524            .unwrap();
3525        thread.read_with(cx, |thread, cx| {
3526            assert_eq!(
3527                thread.to_markdown(cx),
3528                indoc! {"
3529                    ## User
3530
3531                    hello
3532
3533                    ## Assistant
3534
3535                    HELLO
3536
3537                "}
3538            );
3539        });
3540
3541        // Simulate refusing the second message. The message should be truncated
3542        // when a user prompt is refused.
3543        refuse_next.store(true, SeqCst);
3544        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3545            .await
3546            .unwrap();
3547        thread.read_with(cx, |thread, cx| {
3548            assert_eq!(
3549                thread.to_markdown(cx),
3550                indoc! {"
3551                    ## User
3552
3553                    hello
3554
3555                    ## Assistant
3556
3557                    HELLO
3558
3559                "}
3560            );
3561        });
3562    }
3563
3564    async fn run_until_first_tool_call(
3565        thread: &Entity<AcpThread>,
3566        cx: &mut TestAppContext,
3567    ) -> usize {
3568        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3569
3570        let subscription = cx.update(|cx| {
3571            cx.subscribe(thread, move |thread, _, cx| {
3572                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3573                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3574                        return tx.try_send(ix).unwrap();
3575                    }
3576                }
3577            })
3578        });
3579
3580        select! {
3581            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3582                panic!("Timeout waiting for tool call")
3583            }
3584            ix = rx.next().fuse() => {
3585                drop(subscription);
3586                ix.unwrap()
3587            }
3588        }
3589    }
3590
3591    #[derive(Clone, Default)]
3592    struct FakeAgentConnection {
3593        auth_methods: Vec<acp::AuthMethod>,
3594        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3595        on_user_message: Option<
3596            Rc<
3597                dyn Fn(
3598                        acp::PromptRequest,
3599                        WeakEntity<AcpThread>,
3600                        AsyncApp,
3601                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3602                    + 'static,
3603            >,
3604        >,
3605    }
3606
3607    impl FakeAgentConnection {
3608        fn new() -> Self {
3609            Self {
3610                auth_methods: Vec::new(),
3611                on_user_message: None,
3612                sessions: Arc::default(),
3613            }
3614        }
3615
3616        #[expect(unused)]
3617        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3618            self.auth_methods = auth_methods;
3619            self
3620        }
3621
3622        fn on_user_message(
3623            mut self,
3624            handler: impl Fn(
3625                acp::PromptRequest,
3626                WeakEntity<AcpThread>,
3627                AsyncApp,
3628            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3629            + 'static,
3630        ) -> Self {
3631            self.on_user_message.replace(Rc::new(handler));
3632            self
3633        }
3634    }
3635
3636    impl AgentConnection for FakeAgentConnection {
3637        fn telemetry_id(&self) -> &'static str {
3638            "fake"
3639        }
3640
3641        fn auth_methods(&self) -> &[acp::AuthMethod] {
3642            &self.auth_methods
3643        }
3644
3645        fn new_thread(
3646            self: Rc<Self>,
3647            project: Entity<Project>,
3648            _cwd: &Path,
3649            cx: &mut App,
3650        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3651            let session_id = acp::SessionId(
3652                rand::rng()
3653                    .sample_iter(&distr::Alphanumeric)
3654                    .take(7)
3655                    .map(char::from)
3656                    .collect::<String>()
3657                    .into(),
3658            );
3659            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3660            let thread = cx.new(|cx| {
3661                AcpThread::new(
3662                    "Test",
3663                    self.clone(),
3664                    project,
3665                    action_log,
3666                    session_id.clone(),
3667                    watch::Receiver::constant(acp::PromptCapabilities {
3668                        image: true,
3669                        audio: true,
3670                        embedded_context: true,
3671                        meta: None,
3672                    }),
3673                    cx,
3674                )
3675            });
3676            self.sessions.lock().insert(session_id, thread.downgrade());
3677            Task::ready(Ok(thread))
3678        }
3679
3680        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3681            if self.auth_methods().iter().any(|m| m.id == method) {
3682                Task::ready(Ok(()))
3683            } else {
3684                Task::ready(Err(anyhow!("Invalid Auth Method")))
3685            }
3686        }
3687
3688        fn prompt(
3689            &self,
3690            _id: Option<UserMessageId>,
3691            params: acp::PromptRequest,
3692            cx: &mut App,
3693        ) -> Task<gpui::Result<acp::PromptResponse>> {
3694            let sessions = self.sessions.lock();
3695            let thread = sessions.get(&params.session_id).unwrap();
3696            if let Some(handler) = &self.on_user_message {
3697                let handler = handler.clone();
3698                let thread = thread.clone();
3699                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3700            } else {
3701                Task::ready(Ok(acp::PromptResponse {
3702                    stop_reason: acp::StopReason::EndTurn,
3703                    meta: None,
3704                }))
3705            }
3706        }
3707
3708        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3709            let sessions = self.sessions.lock();
3710            let thread = sessions.get(session_id).unwrap().clone();
3711
3712            cx.spawn(async move |cx| {
3713                thread
3714                    .update(cx, |thread, cx| thread.cancel(cx))
3715                    .unwrap()
3716                    .await
3717            })
3718            .detach();
3719        }
3720
3721        fn truncate(
3722            &self,
3723            session_id: &acp::SessionId,
3724            _cx: &App,
3725        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3726            Some(Rc::new(FakeAgentSessionEditor {
3727                _session_id: session_id.clone(),
3728            }))
3729        }
3730
3731        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3732            self
3733        }
3734    }
3735
3736    struct FakeAgentSessionEditor {
3737        _session_id: acp::SessionId,
3738    }
3739
3740    impl AgentSessionTruncate for FakeAgentSessionEditor {
3741        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3742            Task::ready(Ok(()))
3743        }
3744    }
3745
3746    #[gpui::test]
3747    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3748        init_test(cx);
3749
3750        let fs = FakeFs::new(cx.executor());
3751        let project = Project::test(fs, [], cx).await;
3752        let connection = Rc::new(FakeAgentConnection::new());
3753        let thread = cx
3754            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3755            .await
3756            .unwrap();
3757
3758        // Try to update a tool call that doesn't exist
3759        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3760        thread.update(cx, |thread, cx| {
3761            let result = thread.handle_session_update(
3762                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3763                    id: nonexistent_id.clone(),
3764                    fields: acp::ToolCallUpdateFields {
3765                        status: Some(acp::ToolCallStatus::Completed),
3766                        ..Default::default()
3767                    },
3768                    meta: None,
3769                }),
3770                cx,
3771            );
3772
3773            // The update should succeed (not return an error)
3774            assert!(result.is_ok());
3775
3776            // There should now be exactly one entry in the thread
3777            assert_eq!(thread.entries.len(), 1);
3778
3779            // The entry should be a failed tool call
3780            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3781                assert_eq!(tool_call.id, nonexistent_id);
3782                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3783                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3784
3785                // Check that the content contains the error message
3786                assert_eq!(tool_call.content.len(), 1);
3787                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3788                    match content_block {
3789                        ContentBlock::Markdown { markdown } => {
3790                            let markdown_text = markdown.read(cx).source();
3791                            assert!(markdown_text.contains("Tool call not found"));
3792                        }
3793                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3794                        ContentBlock::ResourceLink { .. } => {
3795                            panic!("Expected markdown content, got resource link")
3796                        }
3797                    }
3798                } else {
3799                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3800                }
3801            } else {
3802                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3803            }
3804        });
3805    }
3806}