acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use agent_settings::AgentSettings;
   7use collections::HashSet;
   8pub use connection::*;
   9pub use diff::*;
  10use language::language_settings::FormatOnSave;
  11pub use mention::*;
  12use project::lsp_store::{FormatTrigger, LspFormatTarget};
  13use serde::{Deserialize, Serialize};
  14use settings::Settings as _;
  15use task::{Shell, ShellBuilder};
  16pub use terminal::*;
  17
  18use action_log::{ActionLog, ActionLogTelemetry};
  19use agent_client_protocol::{self as acp};
  20use anyhow::{Context as _, Result, anyhow};
  21use editor::Bias;
  22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  24use itertools::Itertools;
  25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  26use markdown::Markdown;
  27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  28use std::collections::HashMap;
  29use std::error::Error;
  30use std::fmt::{Formatter, Write};
  31use std::ops::Range;
  32use std::process::ExitStatus;
  33use std::rc::Rc;
  34use std::time::{Duration, Instant};
  35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  36use ui::App;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40#[derive(Debug)]
  41pub struct UserMessage {
  42    pub id: Option<UserMessageId>,
  43    pub content: ContentBlock,
  44    pub chunks: Vec<acp::ContentBlock>,
  45    pub checkpoint: Option<Checkpoint>,
  46}
  47
  48#[derive(Debug)]
  49pub struct Checkpoint {
  50    git_checkpoint: GitStoreCheckpoint,
  51    pub show: bool,
  52}
  53
  54impl UserMessage {
  55    fn to_markdown(&self, cx: &App) -> String {
  56        let mut markdown = String::new();
  57        if self
  58            .checkpoint
  59            .as_ref()
  60            .is_some_and(|checkpoint| checkpoint.show)
  61        {
  62            writeln!(markdown, "## User (checkpoint)").unwrap();
  63        } else {
  64            writeln!(markdown, "## User").unwrap();
  65        }
  66        writeln!(markdown).unwrap();
  67        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  68        writeln!(markdown).unwrap();
  69        markdown
  70    }
  71}
  72
  73#[derive(Debug, PartialEq)]
  74pub struct AssistantMessage {
  75    pub chunks: Vec<AssistantMessageChunk>,
  76}
  77
  78impl AssistantMessage {
  79    pub fn to_markdown(&self, cx: &App) -> String {
  80        format!(
  81            "## Assistant\n\n{}\n\n",
  82            self.chunks
  83                .iter()
  84                .map(|chunk| chunk.to_markdown(cx))
  85                .join("\n\n")
  86        )
  87    }
  88}
  89
  90#[derive(Debug, PartialEq)]
  91pub enum AssistantMessageChunk {
  92    Message { block: ContentBlock },
  93    Thought { block: ContentBlock },
  94}
  95
  96impl AssistantMessageChunk {
  97    pub fn from_str(
  98        chunk: &str,
  99        language_registry: &Arc<LanguageRegistry>,
 100        path_style: PathStyle,
 101        cx: &mut App,
 102    ) -> Self {
 103        Self::Message {
 104            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 105        }
 106    }
 107
 108    fn to_markdown(&self, cx: &App) -> String {
 109        match self {
 110            Self::Message { block } => block.to_markdown(cx).to_string(),
 111            Self::Thought { block } => {
 112                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 113            }
 114        }
 115    }
 116}
 117
 118#[derive(Debug)]
 119pub enum AgentThreadEntry {
 120    UserMessage(UserMessage),
 121    AssistantMessage(AssistantMessage),
 122    ToolCall(ToolCall),
 123}
 124
 125impl AgentThreadEntry {
 126    pub fn to_markdown(&self, cx: &App) -> String {
 127        match self {
 128            Self::UserMessage(message) => message.to_markdown(cx),
 129            Self::AssistantMessage(message) => message.to_markdown(cx),
 130            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 131        }
 132    }
 133
 134    pub fn user_message(&self) -> Option<&UserMessage> {
 135        if let AgentThreadEntry::UserMessage(message) = self {
 136            Some(message)
 137        } else {
 138            None
 139        }
 140    }
 141
 142    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 143        if let AgentThreadEntry::ToolCall(call) = self {
 144            itertools::Either::Left(call.diffs())
 145        } else {
 146            itertools::Either::Right(std::iter::empty())
 147        }
 148    }
 149
 150    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 151        if let AgentThreadEntry::ToolCall(call) = self {
 152            itertools::Either::Left(call.terminals())
 153        } else {
 154            itertools::Either::Right(std::iter::empty())
 155        }
 156    }
 157
 158    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 159        if let AgentThreadEntry::ToolCall(ToolCall {
 160            locations,
 161            resolved_locations,
 162            ..
 163        }) = self
 164        {
 165            Some((
 166                locations.get(ix)?.clone(),
 167                resolved_locations.get(ix)?.clone()?,
 168            ))
 169        } else {
 170            None
 171        }
 172    }
 173}
 174
 175#[derive(Debug)]
 176pub struct ToolCall {
 177    pub id: acp::ToolCallId,
 178    pub label: Entity<Markdown>,
 179    pub kind: acp::ToolKind,
 180    pub content: Vec<ToolCallContent>,
 181    pub status: ToolCallStatus,
 182    pub locations: Vec<acp::ToolCallLocation>,
 183    pub resolved_locations: Vec<Option<AgentLocation>>,
 184    pub raw_input: Option<serde_json::Value>,
 185    pub raw_output: Option<serde_json::Value>,
 186}
 187
 188impl ToolCall {
 189    fn from_acp(
 190        tool_call: acp::ToolCall,
 191        status: ToolCallStatus,
 192        language_registry: Arc<LanguageRegistry>,
 193        path_style: PathStyle,
 194        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 195        cx: &mut App,
 196    ) -> Result<Self> {
 197        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 198            first_line.to_owned() + ""
 199        } else {
 200            tool_call.title
 201        };
 202        let mut content = Vec::with_capacity(tool_call.content.len());
 203        for item in tool_call.content {
 204            content.push(ToolCallContent::from_acp(
 205                item,
 206                language_registry.clone(),
 207                path_style,
 208                terminals,
 209                cx,
 210            )?);
 211        }
 212
 213        let result = Self {
 214            id: tool_call.id,
 215            label: cx
 216                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 217            kind: tool_call.kind,
 218            content,
 219            locations: tool_call.locations,
 220            resolved_locations: Vec::default(),
 221            status,
 222            raw_input: tool_call.raw_input,
 223            raw_output: tool_call.raw_output,
 224        };
 225        Ok(result)
 226    }
 227
 228    fn update_fields(
 229        &mut self,
 230        fields: acp::ToolCallUpdateFields,
 231        language_registry: Arc<LanguageRegistry>,
 232        path_style: PathStyle,
 233        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 234        cx: &mut App,
 235    ) -> Result<()> {
 236        let acp::ToolCallUpdateFields {
 237            kind,
 238            status,
 239            title,
 240            content,
 241            locations,
 242            raw_input,
 243            raw_output,
 244        } = fields;
 245
 246        if let Some(kind) = kind {
 247            self.kind = kind;
 248        }
 249
 250        if let Some(status) = status {
 251            self.status = status.into();
 252        }
 253
 254        if let Some(title) = title {
 255            self.label.update(cx, |label, cx| {
 256                if let Some((first_line, _)) = title.split_once("\n") {
 257                    label.replace(first_line.to_owned() + "", cx)
 258                } else {
 259                    label.replace(title, cx);
 260                }
 261            });
 262        }
 263
 264        if let Some(content) = content {
 265            let new_content_len = content.len();
 266            let mut content = content.into_iter();
 267
 268            // Reuse existing content if we can
 269            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 270                old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 271            }
 272            for new in content {
 273                self.content.push(ToolCallContent::from_acp(
 274                    new,
 275                    language_registry.clone(),
 276                    path_style,
 277                    terminals,
 278                    cx,
 279                )?)
 280            }
 281            self.content.truncate(new_content_len);
 282        }
 283
 284        if let Some(locations) = locations {
 285            self.locations = locations;
 286        }
 287
 288        if let Some(raw_input) = raw_input {
 289            self.raw_input = Some(raw_input);
 290        }
 291
 292        if let Some(raw_output) = raw_output {
 293            if self.content.is_empty()
 294                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 295            {
 296                self.content
 297                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 298                        markdown,
 299                    }));
 300            }
 301            self.raw_output = Some(raw_output);
 302        }
 303        Ok(())
 304    }
 305
 306    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 307        self.content.iter().filter_map(|content| match content {
 308            ToolCallContent::Diff(diff) => Some(diff),
 309            ToolCallContent::ContentBlock(_) => None,
 310            ToolCallContent::Terminal(_) => None,
 311        })
 312    }
 313
 314    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 315        self.content.iter().filter_map(|content| match content {
 316            ToolCallContent::Terminal(terminal) => Some(terminal),
 317            ToolCallContent::ContentBlock(_) => None,
 318            ToolCallContent::Diff(_) => None,
 319        })
 320    }
 321
 322    fn to_markdown(&self, cx: &App) -> String {
 323        let mut markdown = format!(
 324            "**Tool Call: {}**\nStatus: {}\n\n",
 325            self.label.read(cx).source(),
 326            self.status
 327        );
 328        for content in &self.content {
 329            markdown.push_str(content.to_markdown(cx).as_str());
 330            markdown.push_str("\n\n");
 331        }
 332        markdown
 333    }
 334
 335    async fn resolve_location(
 336        location: acp::ToolCallLocation,
 337        project: WeakEntity<Project>,
 338        cx: &mut AsyncApp,
 339    ) -> Option<ResolvedLocation> {
 340        let buffer = project
 341            .update(cx, |project, cx| {
 342                project
 343                    .project_path_for_absolute_path(&location.path, cx)
 344                    .map(|path| project.open_buffer(path, cx))
 345            })
 346            .ok()??;
 347        let buffer = buffer.await.log_err()?;
 348        let position = buffer
 349            .update(cx, |buffer, _| {
 350                if let Some(row) = location.line {
 351                    let snapshot = buffer.snapshot();
 352                    let column = snapshot.indent_size_for_line(row).len;
 353                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 354                    snapshot.anchor_before(point)
 355                } else {
 356                    Anchor::MIN
 357                }
 358            })
 359            .ok()?;
 360
 361        Some(ResolvedLocation { buffer, position })
 362    }
 363
 364    fn resolve_locations(
 365        &self,
 366        project: Entity<Project>,
 367        cx: &mut App,
 368    ) -> Task<Vec<Option<ResolvedLocation>>> {
 369        let locations = self.locations.clone();
 370        project.update(cx, |_, cx| {
 371            cx.spawn(async move |project, cx| {
 372                let mut new_locations = Vec::new();
 373                for location in locations {
 374                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 375                }
 376                new_locations
 377            })
 378        })
 379    }
 380}
 381
 382// Separate so we can hold a strong reference to the buffer
 383// for saving on the thread
 384#[derive(Clone, Debug, PartialEq, Eq)]
 385struct ResolvedLocation {
 386    buffer: Entity<Buffer>,
 387    position: Anchor,
 388}
 389
 390impl From<&ResolvedLocation> for AgentLocation {
 391    fn from(value: &ResolvedLocation) -> Self {
 392        Self {
 393            buffer: value.buffer.downgrade(),
 394            position: value.position,
 395        }
 396    }
 397}
 398
 399#[derive(Debug)]
 400pub enum ToolCallStatus {
 401    /// The tool call hasn't started running yet, but we start showing it to
 402    /// the user.
 403    Pending,
 404    /// The tool call is waiting for confirmation from the user.
 405    WaitingForConfirmation {
 406        options: Vec<acp::PermissionOption>,
 407        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 408    },
 409    /// The tool call is currently running.
 410    InProgress,
 411    /// The tool call completed successfully.
 412    Completed,
 413    /// The tool call failed.
 414    Failed,
 415    /// The user rejected the tool call.
 416    Rejected,
 417    /// The user canceled generation so the tool call was canceled.
 418    Canceled,
 419}
 420
 421impl From<acp::ToolCallStatus> for ToolCallStatus {
 422    fn from(status: acp::ToolCallStatus) -> Self {
 423        match status {
 424            acp::ToolCallStatus::Pending => Self::Pending,
 425            acp::ToolCallStatus::InProgress => Self::InProgress,
 426            acp::ToolCallStatus::Completed => Self::Completed,
 427            acp::ToolCallStatus::Failed => Self::Failed,
 428        }
 429    }
 430}
 431
 432impl Display for ToolCallStatus {
 433    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 434        write!(
 435            f,
 436            "{}",
 437            match self {
 438                ToolCallStatus::Pending => "Pending",
 439                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 440                ToolCallStatus::InProgress => "In Progress",
 441                ToolCallStatus::Completed => "Completed",
 442                ToolCallStatus::Failed => "Failed",
 443                ToolCallStatus::Rejected => "Rejected",
 444                ToolCallStatus::Canceled => "Canceled",
 445            }
 446        )
 447    }
 448}
 449
 450#[derive(Debug, PartialEq, Clone)]
 451pub enum ContentBlock {
 452    Empty,
 453    Markdown { markdown: Entity<Markdown> },
 454    ResourceLink { resource_link: acp::ResourceLink },
 455}
 456
 457impl ContentBlock {
 458    pub fn new(
 459        block: acp::ContentBlock,
 460        language_registry: &Arc<LanguageRegistry>,
 461        path_style: PathStyle,
 462        cx: &mut App,
 463    ) -> Self {
 464        let mut this = Self::Empty;
 465        this.append(block, language_registry, path_style, cx);
 466        this
 467    }
 468
 469    pub fn new_combined(
 470        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 471        language_registry: Arc<LanguageRegistry>,
 472        path_style: PathStyle,
 473        cx: &mut App,
 474    ) -> Self {
 475        let mut this = Self::Empty;
 476        for block in blocks {
 477            this.append(block, &language_registry, path_style, cx);
 478        }
 479        this
 480    }
 481
 482    pub fn append(
 483        &mut self,
 484        block: acp::ContentBlock,
 485        language_registry: &Arc<LanguageRegistry>,
 486        path_style: PathStyle,
 487        cx: &mut App,
 488    ) {
 489        if matches!(self, ContentBlock::Empty)
 490            && let acp::ContentBlock::ResourceLink(resource_link) = block
 491        {
 492            *self = ContentBlock::ResourceLink { resource_link };
 493            return;
 494        }
 495
 496        let new_content = self.block_string_contents(block, path_style);
 497
 498        match self {
 499            ContentBlock::Empty => {
 500                *self = Self::create_markdown_block(new_content, language_registry, cx);
 501            }
 502            ContentBlock::Markdown { markdown } => {
 503                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 504            }
 505            ContentBlock::ResourceLink { resource_link } => {
 506                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 507                let combined = format!("{}\n{}", existing_content, new_content);
 508
 509                *self = Self::create_markdown_block(combined, language_registry, cx);
 510            }
 511        }
 512    }
 513
 514    fn create_markdown_block(
 515        content: String,
 516        language_registry: &Arc<LanguageRegistry>,
 517        cx: &mut App,
 518    ) -> ContentBlock {
 519        ContentBlock::Markdown {
 520            markdown: cx
 521                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 522        }
 523    }
 524
 525    fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
 526        match block {
 527            acp::ContentBlock::Text(text_content) => text_content.text,
 528            acp::ContentBlock::ResourceLink(resource_link) => {
 529                Self::resource_link_md(&resource_link.uri, path_style)
 530            }
 531            acp::ContentBlock::Resource(acp::EmbeddedResource {
 532                resource:
 533                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 534                        uri,
 535                        ..
 536                    }),
 537                ..
 538            }) => Self::resource_link_md(&uri, path_style),
 539            acp::ContentBlock::Image(image) => Self::image_md(&image),
 540            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 541        }
 542    }
 543
 544    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 545        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 546            uri.as_link().to_string()
 547        } else {
 548            uri.to_string()
 549        }
 550    }
 551
 552    fn image_md(_image: &acp::ImageContent) -> String {
 553        "`Image`".into()
 554    }
 555
 556    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 557        match self {
 558            ContentBlock::Empty => "",
 559            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 560            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 561        }
 562    }
 563
 564    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 565        match self {
 566            ContentBlock::Empty => None,
 567            ContentBlock::Markdown { markdown } => Some(markdown),
 568            ContentBlock::ResourceLink { .. } => None,
 569        }
 570    }
 571
 572    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 573        match self {
 574            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 575            _ => None,
 576        }
 577    }
 578}
 579
 580#[derive(Debug)]
 581pub enum ToolCallContent {
 582    ContentBlock(ContentBlock),
 583    Diff(Entity<Diff>),
 584    Terminal(Entity<Terminal>),
 585}
 586
 587impl ToolCallContent {
 588    pub fn from_acp(
 589        content: acp::ToolCallContent,
 590        language_registry: Arc<LanguageRegistry>,
 591        path_style: PathStyle,
 592        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 593        cx: &mut App,
 594    ) -> Result<Self> {
 595        match content {
 596            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 597                content,
 598                &language_registry,
 599                path_style,
 600                cx,
 601            ))),
 602            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 603                Diff::finalized(
 604                    diff.path.to_string_lossy().into_owned(),
 605                    diff.old_text,
 606                    diff.new_text,
 607                    language_registry,
 608                    cx,
 609                )
 610            }))),
 611            acp::ToolCallContent::Terminal { terminal_id } => terminals
 612                .get(&terminal_id)
 613                .cloned()
 614                .map(Self::Terminal)
 615                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 616        }
 617    }
 618
 619    pub fn update_from_acp(
 620        &mut self,
 621        new: acp::ToolCallContent,
 622        language_registry: Arc<LanguageRegistry>,
 623        path_style: PathStyle,
 624        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 625        cx: &mut App,
 626    ) -> Result<()> {
 627        let needs_update = match (&self, &new) {
 628            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 629                old_diff.read(cx).needs_update(
 630                    new_diff.old_text.as_deref().unwrap_or(""),
 631                    &new_diff.new_text,
 632                    cx,
 633                )
 634            }
 635            _ => true,
 636        };
 637
 638        if needs_update {
 639            *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
 640        }
 641        Ok(())
 642    }
 643
 644    pub fn to_markdown(&self, cx: &App) -> String {
 645        match self {
 646            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 647            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 648            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 649        }
 650    }
 651}
 652
 653#[derive(Debug, PartialEq)]
 654pub enum ToolCallUpdate {
 655    UpdateFields(acp::ToolCallUpdate),
 656    UpdateDiff(ToolCallUpdateDiff),
 657    UpdateTerminal(ToolCallUpdateTerminal),
 658}
 659
 660impl ToolCallUpdate {
 661    fn id(&self) -> &acp::ToolCallId {
 662        match self {
 663            Self::UpdateFields(update) => &update.id,
 664            Self::UpdateDiff(diff) => &diff.id,
 665            Self::UpdateTerminal(terminal) => &terminal.id,
 666        }
 667    }
 668}
 669
 670impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 671    fn from(update: acp::ToolCallUpdate) -> Self {
 672        Self::UpdateFields(update)
 673    }
 674}
 675
 676impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 677    fn from(diff: ToolCallUpdateDiff) -> Self {
 678        Self::UpdateDiff(diff)
 679    }
 680}
 681
 682#[derive(Debug, PartialEq)]
 683pub struct ToolCallUpdateDiff {
 684    pub id: acp::ToolCallId,
 685    pub diff: Entity<Diff>,
 686}
 687
 688impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 689    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 690        Self::UpdateTerminal(terminal)
 691    }
 692}
 693
 694#[derive(Debug, PartialEq)]
 695pub struct ToolCallUpdateTerminal {
 696    pub id: acp::ToolCallId,
 697    pub terminal: Entity<Terminal>,
 698}
 699
 700#[derive(Debug, Default)]
 701pub struct Plan {
 702    pub entries: Vec<PlanEntry>,
 703}
 704
 705#[derive(Debug)]
 706pub struct PlanStats<'a> {
 707    pub in_progress_entry: Option<&'a PlanEntry>,
 708    pub pending: u32,
 709    pub completed: u32,
 710}
 711
 712impl Plan {
 713    pub fn is_empty(&self) -> bool {
 714        self.entries.is_empty()
 715    }
 716
 717    pub fn stats(&self) -> PlanStats<'_> {
 718        let mut stats = PlanStats {
 719            in_progress_entry: None,
 720            pending: 0,
 721            completed: 0,
 722        };
 723
 724        for entry in &self.entries {
 725            match &entry.status {
 726                acp::PlanEntryStatus::Pending => {
 727                    stats.pending += 1;
 728                }
 729                acp::PlanEntryStatus::InProgress => {
 730                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 731                }
 732                acp::PlanEntryStatus::Completed => {
 733                    stats.completed += 1;
 734                }
 735            }
 736        }
 737
 738        stats
 739    }
 740}
 741
 742#[derive(Debug)]
 743pub struct PlanEntry {
 744    pub content: Entity<Markdown>,
 745    pub priority: acp::PlanEntryPriority,
 746    pub status: acp::PlanEntryStatus,
 747}
 748
 749impl PlanEntry {
 750    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 751        Self {
 752            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 753            priority: entry.priority,
 754            status: entry.status,
 755        }
 756    }
 757}
 758
 759#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 760pub struct TokenUsage {
 761    pub max_tokens: u64,
 762    pub used_tokens: u64,
 763}
 764
 765impl TokenUsage {
 766    pub fn ratio(&self) -> TokenUsageRatio {
 767        #[cfg(debug_assertions)]
 768        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 769            .unwrap_or("0.8".to_string())
 770            .parse()
 771            .unwrap();
 772        #[cfg(not(debug_assertions))]
 773        let warning_threshold: f32 = 0.8;
 774
 775        // When the maximum is unknown because there is no selected model,
 776        // avoid showing the token limit warning.
 777        if self.max_tokens == 0 {
 778            TokenUsageRatio::Normal
 779        } else if self.used_tokens >= self.max_tokens {
 780            TokenUsageRatio::Exceeded
 781        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 782            TokenUsageRatio::Warning
 783        } else {
 784            TokenUsageRatio::Normal
 785        }
 786    }
 787}
 788
 789#[derive(Debug, Clone, PartialEq, Eq)]
 790pub enum TokenUsageRatio {
 791    Normal,
 792    Warning,
 793    Exceeded,
 794}
 795
 796#[derive(Debug, Clone)]
 797pub struct RetryStatus {
 798    pub last_error: SharedString,
 799    pub attempt: usize,
 800    pub max_attempts: usize,
 801    pub started_at: Instant,
 802    pub duration: Duration,
 803}
 804
 805pub struct AcpThread {
 806    title: SharedString,
 807    entries: Vec<AgentThreadEntry>,
 808    plan: Plan,
 809    project: Entity<Project>,
 810    action_log: Entity<ActionLog>,
 811    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 812    send_task: Option<Task<()>>,
 813    connection: Rc<dyn AgentConnection>,
 814    session_id: acp::SessionId,
 815    token_usage: Option<TokenUsage>,
 816    prompt_capabilities: acp::PromptCapabilities,
 817    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 818    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 819    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 820    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 821}
 822
 823impl From<&AcpThread> for ActionLogTelemetry {
 824    fn from(value: &AcpThread) -> Self {
 825        Self {
 826            agent_telemetry_id: value.connection().telemetry_id(),
 827            session_id: value.session_id.0.clone(),
 828        }
 829    }
 830}
 831
 832#[derive(Debug)]
 833pub enum AcpThreadEvent {
 834    NewEntry,
 835    TitleUpdated,
 836    TokenUsageUpdated,
 837    EntryUpdated(usize),
 838    EntriesRemoved(Range<usize>),
 839    ToolAuthorizationRequired,
 840    Retry(RetryStatus),
 841    Stopped,
 842    Error,
 843    LoadError(LoadError),
 844    PromptCapabilitiesUpdated,
 845    Refusal,
 846    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 847    ModeUpdated(acp::SessionModeId),
 848}
 849
 850impl EventEmitter<AcpThreadEvent> for AcpThread {}
 851
 852#[derive(Debug, Clone)]
 853pub enum TerminalProviderEvent {
 854    Created {
 855        terminal_id: acp::TerminalId,
 856        label: String,
 857        cwd: Option<PathBuf>,
 858        output_byte_limit: Option<u64>,
 859        terminal: Entity<::terminal::Terminal>,
 860    },
 861    Output {
 862        terminal_id: acp::TerminalId,
 863        data: Vec<u8>,
 864    },
 865    TitleChanged {
 866        terminal_id: acp::TerminalId,
 867        title: String,
 868    },
 869    Exit {
 870        terminal_id: acp::TerminalId,
 871        status: acp::TerminalExitStatus,
 872    },
 873}
 874
 875#[derive(Debug, Clone)]
 876pub enum TerminalProviderCommand {
 877    WriteInput {
 878        terminal_id: acp::TerminalId,
 879        bytes: Vec<u8>,
 880    },
 881    Resize {
 882        terminal_id: acp::TerminalId,
 883        cols: u16,
 884        rows: u16,
 885    },
 886    Close {
 887        terminal_id: acp::TerminalId,
 888    },
 889}
 890
 891impl AcpThread {
 892    pub fn on_terminal_provider_event(
 893        &mut self,
 894        event: TerminalProviderEvent,
 895        cx: &mut Context<Self>,
 896    ) {
 897        match event {
 898            TerminalProviderEvent::Created {
 899                terminal_id,
 900                label,
 901                cwd,
 902                output_byte_limit,
 903                terminal,
 904            } => {
 905                let entity = self.register_terminal_created(
 906                    terminal_id.clone(),
 907                    label,
 908                    cwd,
 909                    output_byte_limit,
 910                    terminal,
 911                    cx,
 912                );
 913
 914                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 915                    for data in chunks.drain(..) {
 916                        entity.update(cx, |term, cx| {
 917                            term.inner().update(cx, |inner, cx| {
 918                                inner.write_output(&data, cx);
 919                            })
 920                        });
 921                    }
 922                }
 923
 924                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 925                    entity.update(cx, |_term, cx| {
 926                        cx.notify();
 927                    });
 928                }
 929
 930                cx.notify();
 931            }
 932            TerminalProviderEvent::Output { terminal_id, data } => {
 933                if let Some(entity) = self.terminals.get(&terminal_id) {
 934                    entity.update(cx, |term, cx| {
 935                        term.inner().update(cx, |inner, cx| {
 936                            inner.write_output(&data, cx);
 937                        })
 938                    });
 939                } else {
 940                    self.pending_terminal_output
 941                        .entry(terminal_id)
 942                        .or_default()
 943                        .push(data);
 944                }
 945            }
 946            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 947                if let Some(entity) = self.terminals.get(&terminal_id) {
 948                    entity.update(cx, |term, cx| {
 949                        term.inner().update(cx, |inner, cx| {
 950                            inner.breadcrumb_text = title;
 951                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 952                        })
 953                    });
 954                }
 955            }
 956            TerminalProviderEvent::Exit {
 957                terminal_id,
 958                status,
 959            } => {
 960                if let Some(entity) = self.terminals.get(&terminal_id) {
 961                    entity.update(cx, |_term, cx| {
 962                        cx.notify();
 963                    });
 964                } else {
 965                    self.pending_terminal_exit.insert(terminal_id, status);
 966                }
 967            }
 968        }
 969    }
 970}
 971
 972#[derive(PartialEq, Eq, Debug)]
 973pub enum ThreadStatus {
 974    Idle,
 975    Generating,
 976}
 977
 978#[derive(Debug, Clone)]
 979pub enum LoadError {
 980    Unsupported {
 981        command: SharedString,
 982        current_version: SharedString,
 983        minimum_version: SharedString,
 984    },
 985    FailedToInstall(SharedString),
 986    Exited {
 987        status: ExitStatus,
 988    },
 989    Other(SharedString),
 990}
 991
 992impl Display for LoadError {
 993    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 994        match self {
 995            LoadError::Unsupported {
 996                command: path,
 997                current_version,
 998                minimum_version,
 999            } => {
1000                write!(
1001                    f,
1002                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1003                )
1004            }
1005            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1006            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1007            LoadError::Other(msg) => write!(f, "{msg}"),
1008        }
1009    }
1010}
1011
1012impl Error for LoadError {}
1013
1014impl AcpThread {
1015    pub fn new(
1016        title: impl Into<SharedString>,
1017        connection: Rc<dyn AgentConnection>,
1018        project: Entity<Project>,
1019        action_log: Entity<ActionLog>,
1020        session_id: acp::SessionId,
1021        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1022        cx: &mut Context<Self>,
1023    ) -> Self {
1024        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1025        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1026            loop {
1027                let caps = prompt_capabilities_rx.recv().await?;
1028                this.update(cx, |this, cx| {
1029                    this.prompt_capabilities = caps;
1030                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1031                })?;
1032            }
1033        });
1034
1035        Self {
1036            action_log,
1037            shared_buffers: Default::default(),
1038            entries: Default::default(),
1039            plan: Default::default(),
1040            title: title.into(),
1041            project,
1042            send_task: None,
1043            connection,
1044            session_id,
1045            token_usage: None,
1046            prompt_capabilities,
1047            _observe_prompt_capabilities: task,
1048            terminals: HashMap::default(),
1049            pending_terminal_output: HashMap::default(),
1050            pending_terminal_exit: HashMap::default(),
1051        }
1052    }
1053
1054    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1055        self.prompt_capabilities.clone()
1056    }
1057
1058    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1059        &self.connection
1060    }
1061
1062    pub fn action_log(&self) -> &Entity<ActionLog> {
1063        &self.action_log
1064    }
1065
1066    pub fn project(&self) -> &Entity<Project> {
1067        &self.project
1068    }
1069
1070    pub fn title(&self) -> SharedString {
1071        self.title.clone()
1072    }
1073
1074    pub fn entries(&self) -> &[AgentThreadEntry] {
1075        &self.entries
1076    }
1077
1078    pub fn session_id(&self) -> &acp::SessionId {
1079        &self.session_id
1080    }
1081
1082    pub fn status(&self) -> ThreadStatus {
1083        if self.send_task.is_some() {
1084            ThreadStatus::Generating
1085        } else {
1086            ThreadStatus::Idle
1087        }
1088    }
1089
1090    pub fn token_usage(&self) -> Option<&TokenUsage> {
1091        self.token_usage.as_ref()
1092    }
1093
1094    pub fn has_pending_edit_tool_calls(&self) -> bool {
1095        for entry in self.entries.iter().rev() {
1096            match entry {
1097                AgentThreadEntry::UserMessage(_) => return false,
1098                AgentThreadEntry::ToolCall(
1099                    call @ ToolCall {
1100                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1101                        ..
1102                    },
1103                ) if call.diffs().next().is_some() => {
1104                    return true;
1105                }
1106                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1107            }
1108        }
1109
1110        false
1111    }
1112
1113    pub fn used_tools_since_last_user_message(&self) -> bool {
1114        for entry in self.entries.iter().rev() {
1115            match entry {
1116                AgentThreadEntry::UserMessage(..) => return false,
1117                AgentThreadEntry::AssistantMessage(..) => continue,
1118                AgentThreadEntry::ToolCall(..) => return true,
1119            }
1120        }
1121
1122        false
1123    }
1124
1125    pub fn handle_session_update(
1126        &mut self,
1127        update: acp::SessionUpdate,
1128        cx: &mut Context<Self>,
1129    ) -> Result<(), acp::Error> {
1130        match update {
1131            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1132                self.push_user_content_block(None, content, cx);
1133            }
1134            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1135                self.push_assistant_content_block(content, false, cx);
1136            }
1137            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1138                self.push_assistant_content_block(content, true, cx);
1139            }
1140            acp::SessionUpdate::ToolCall(tool_call) => {
1141                self.upsert_tool_call(tool_call, cx)?;
1142            }
1143            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1144                self.update_tool_call(tool_call_update, cx)?;
1145            }
1146            acp::SessionUpdate::Plan(plan) => {
1147                self.update_plan(plan, cx);
1148            }
1149            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1150                available_commands,
1151                ..
1152            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1153            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1154                current_mode_id,
1155                ..
1156            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1157        }
1158        Ok(())
1159    }
1160
1161    pub fn push_user_content_block(
1162        &mut self,
1163        message_id: Option<UserMessageId>,
1164        chunk: acp::ContentBlock,
1165        cx: &mut Context<Self>,
1166    ) {
1167        let language_registry = self.project.read(cx).languages().clone();
1168        let path_style = self.project.read(cx).path_style(cx);
1169        let entries_len = self.entries.len();
1170
1171        if let Some(last_entry) = self.entries.last_mut()
1172            && let AgentThreadEntry::UserMessage(UserMessage {
1173                id,
1174                content,
1175                chunks,
1176                ..
1177            }) = last_entry
1178        {
1179            *id = message_id.or(id.take());
1180            content.append(chunk.clone(), &language_registry, path_style, cx);
1181            chunks.push(chunk);
1182            let idx = entries_len - 1;
1183            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1184        } else {
1185            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1186            self.push_entry(
1187                AgentThreadEntry::UserMessage(UserMessage {
1188                    id: message_id,
1189                    content,
1190                    chunks: vec![chunk],
1191                    checkpoint: None,
1192                }),
1193                cx,
1194            );
1195        }
1196    }
1197
1198    pub fn push_assistant_content_block(
1199        &mut self,
1200        chunk: acp::ContentBlock,
1201        is_thought: bool,
1202        cx: &mut Context<Self>,
1203    ) {
1204        let language_registry = self.project.read(cx).languages().clone();
1205        let path_style = self.project.read(cx).path_style(cx);
1206        let entries_len = self.entries.len();
1207        if let Some(last_entry) = self.entries.last_mut()
1208            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1209        {
1210            let idx = entries_len - 1;
1211            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1212            match (chunks.last_mut(), is_thought) {
1213                (Some(AssistantMessageChunk::Message { block }), false)
1214                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1215                    block.append(chunk, &language_registry, path_style, cx)
1216                }
1217                _ => {
1218                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1219                    if is_thought {
1220                        chunks.push(AssistantMessageChunk::Thought { block })
1221                    } else {
1222                        chunks.push(AssistantMessageChunk::Message { block })
1223                    }
1224                }
1225            }
1226        } else {
1227            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1228            let chunk = if is_thought {
1229                AssistantMessageChunk::Thought { block }
1230            } else {
1231                AssistantMessageChunk::Message { block }
1232            };
1233
1234            self.push_entry(
1235                AgentThreadEntry::AssistantMessage(AssistantMessage {
1236                    chunks: vec![chunk],
1237                }),
1238                cx,
1239            );
1240        }
1241    }
1242
1243    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1244        self.entries.push(entry);
1245        cx.emit(AcpThreadEvent::NewEntry);
1246    }
1247
1248    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1249        self.connection.set_title(&self.session_id, cx).is_some()
1250    }
1251
1252    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1253        if title != self.title {
1254            self.title = title.clone();
1255            cx.emit(AcpThreadEvent::TitleUpdated);
1256            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1257                return set_title.run(title, cx);
1258            }
1259        }
1260        Task::ready(Ok(()))
1261    }
1262
1263    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1264        self.token_usage = usage;
1265        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1266    }
1267
1268    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1269        cx.emit(AcpThreadEvent::Retry(status));
1270    }
1271
1272    pub fn update_tool_call(
1273        &mut self,
1274        update: impl Into<ToolCallUpdate>,
1275        cx: &mut Context<Self>,
1276    ) -> Result<()> {
1277        let update = update.into();
1278        let languages = self.project.read(cx).languages().clone();
1279        let path_style = self.project.read(cx).path_style(cx);
1280
1281        let ix = match self.index_for_tool_call(update.id()) {
1282            Some(ix) => ix,
1283            None => {
1284                // Tool call not found - create a failed tool call entry
1285                let failed_tool_call = ToolCall {
1286                    id: update.id().clone(),
1287                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1288                    kind: acp::ToolKind::Fetch,
1289                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1290                        acp::ContentBlock::Text(acp::TextContent {
1291                            text: "Tool call not found".to_string(),
1292                            annotations: None,
1293                            meta: None,
1294                        }),
1295                        &languages,
1296                        path_style,
1297                        cx,
1298                    ))],
1299                    status: ToolCallStatus::Failed,
1300                    locations: Vec::new(),
1301                    resolved_locations: Vec::new(),
1302                    raw_input: None,
1303                    raw_output: None,
1304                };
1305                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1306                return Ok(());
1307            }
1308        };
1309        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1310            unreachable!()
1311        };
1312
1313        match update {
1314            ToolCallUpdate::UpdateFields(update) => {
1315                let location_updated = update.fields.locations.is_some();
1316                call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1317                if location_updated {
1318                    self.resolve_locations(update.id, cx);
1319                }
1320            }
1321            ToolCallUpdate::UpdateDiff(update) => {
1322                call.content.clear();
1323                call.content.push(ToolCallContent::Diff(update.diff));
1324            }
1325            ToolCallUpdate::UpdateTerminal(update) => {
1326                call.content.clear();
1327                call.content
1328                    .push(ToolCallContent::Terminal(update.terminal));
1329            }
1330        }
1331
1332        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1333
1334        Ok(())
1335    }
1336
1337    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1338    pub fn upsert_tool_call(
1339        &mut self,
1340        tool_call: acp::ToolCall,
1341        cx: &mut Context<Self>,
1342    ) -> Result<(), acp::Error> {
1343        let status = tool_call.status.into();
1344        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1345    }
1346
1347    /// Fails if id does not match an existing entry.
1348    pub fn upsert_tool_call_inner(
1349        &mut self,
1350        update: acp::ToolCallUpdate,
1351        status: ToolCallStatus,
1352        cx: &mut Context<Self>,
1353    ) -> Result<(), acp::Error> {
1354        let language_registry = self.project.read(cx).languages().clone();
1355        let path_style = self.project.read(cx).path_style(cx);
1356        let id = update.id.clone();
1357
1358        let agent = self.connection().telemetry_id();
1359        let session = self.session_id();
1360        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1361            let status = if matches!(status, ToolCallStatus::Completed) {
1362                "completed"
1363            } else {
1364                "failed"
1365            };
1366            telemetry::event!("Agent Tool Call Completed", agent, session, status);
1367        }
1368
1369        if let Some(ix) = self.index_for_tool_call(&id) {
1370            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1371                unreachable!()
1372            };
1373
1374            call.update_fields(
1375                update.fields,
1376                language_registry,
1377                path_style,
1378                &self.terminals,
1379                cx,
1380            )?;
1381            call.status = status;
1382
1383            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1384        } else {
1385            let call = ToolCall::from_acp(
1386                update.try_into()?,
1387                status,
1388                language_registry,
1389                self.project.read(cx).path_style(cx),
1390                &self.terminals,
1391                cx,
1392            )?;
1393            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1394        };
1395
1396        self.resolve_locations(id, cx);
1397        Ok(())
1398    }
1399
1400    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1401        self.entries
1402            .iter()
1403            .enumerate()
1404            .rev()
1405            .find_map(|(index, entry)| {
1406                if let AgentThreadEntry::ToolCall(tool_call) = entry
1407                    && &tool_call.id == id
1408                {
1409                    Some(index)
1410                } else {
1411                    None
1412                }
1413            })
1414    }
1415
1416    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1417        // The tool call we are looking for is typically the last one, or very close to the end.
1418        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1419        self.entries
1420            .iter_mut()
1421            .enumerate()
1422            .rev()
1423            .find_map(|(index, tool_call)| {
1424                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1425                    && &tool_call.id == id
1426                {
1427                    Some((index, tool_call))
1428                } else {
1429                    None
1430                }
1431            })
1432    }
1433
1434    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1435        self.entries
1436            .iter()
1437            .enumerate()
1438            .rev()
1439            .find_map(|(index, tool_call)| {
1440                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1441                    && &tool_call.id == id
1442                {
1443                    Some((index, tool_call))
1444                } else {
1445                    None
1446                }
1447            })
1448    }
1449
1450    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1451        let project = self.project.clone();
1452        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1453            return;
1454        };
1455        let task = tool_call.resolve_locations(project, cx);
1456        cx.spawn(async move |this, cx| {
1457            let resolved_locations = task.await;
1458
1459            this.update(cx, |this, cx| {
1460                let project = this.project.clone();
1461
1462                for location in resolved_locations.iter().flatten() {
1463                    this.shared_buffers
1464                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1465                }
1466                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1467                    return;
1468                };
1469
1470                if let Some(Some(location)) = resolved_locations.last() {
1471                    project.update(cx, |project, cx| {
1472                        let should_ignore = if let Some(agent_location) = project
1473                            .agent_location()
1474                            .filter(|agent_location| agent_location.buffer == location.buffer)
1475                        {
1476                            let snapshot = location.buffer.read(cx).snapshot();
1477                            let old_position = agent_location.position.to_point(&snapshot);
1478                            let new_position = location.position.to_point(&snapshot);
1479
1480                            // ignore this so that when we get updates from the edit tool
1481                            // the position doesn't reset to the startof line
1482                            old_position.row == new_position.row
1483                                && old_position.column > new_position.column
1484                        } else {
1485                            false
1486                        };
1487                        if !should_ignore {
1488                            project.set_agent_location(Some(location.into()), cx);
1489                        }
1490                    });
1491                }
1492
1493                let resolved_locations = resolved_locations
1494                    .iter()
1495                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1496                    .collect::<Vec<_>>();
1497
1498                if tool_call.resolved_locations != resolved_locations {
1499                    tool_call.resolved_locations = resolved_locations;
1500                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1501                }
1502            })
1503        })
1504        .detach();
1505    }
1506
1507    pub fn request_tool_call_authorization(
1508        &mut self,
1509        tool_call: acp::ToolCallUpdate,
1510        options: Vec<acp::PermissionOption>,
1511        respect_always_allow_setting: bool,
1512        cx: &mut Context<Self>,
1513    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1514        let (tx, rx) = oneshot::channel();
1515
1516        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1517            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1518            // some tools would (incorrectly) continue to auto-accept.
1519            if let Some(allow_once_option) = options.iter().find_map(|option| {
1520                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1521                    Some(option.id.clone())
1522                } else {
1523                    None
1524                }
1525            }) {
1526                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1527                return Ok(async {
1528                    acp::RequestPermissionOutcome::Selected {
1529                        option_id: allow_once_option,
1530                    }
1531                }
1532                .boxed());
1533            }
1534        }
1535
1536        let status = ToolCallStatus::WaitingForConfirmation {
1537            options,
1538            respond_tx: tx,
1539        };
1540
1541        self.upsert_tool_call_inner(tool_call, status, cx)?;
1542        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1543
1544        let fut = async {
1545            match rx.await {
1546                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1547                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1548            }
1549        }
1550        .boxed();
1551
1552        Ok(fut)
1553    }
1554
1555    pub fn authorize_tool_call(
1556        &mut self,
1557        id: acp::ToolCallId,
1558        option_id: acp::PermissionOptionId,
1559        option_kind: acp::PermissionOptionKind,
1560        cx: &mut Context<Self>,
1561    ) {
1562        let Some((ix, call)) = self.tool_call_mut(&id) else {
1563            return;
1564        };
1565
1566        let new_status = match option_kind {
1567            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1568                ToolCallStatus::Rejected
1569            }
1570            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1571                ToolCallStatus::InProgress
1572            }
1573        };
1574
1575        let curr_status = mem::replace(&mut call.status, new_status);
1576
1577        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1578            respond_tx.send(option_id).log_err();
1579        } else if cfg!(debug_assertions) {
1580            panic!("tried to authorize an already authorized tool call");
1581        }
1582
1583        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1584    }
1585
1586    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1587        let mut first_tool_call = None;
1588
1589        for entry in self.entries.iter().rev() {
1590            match &entry {
1591                AgentThreadEntry::ToolCall(call) => {
1592                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1593                        first_tool_call = Some(call);
1594                    } else {
1595                        continue;
1596                    }
1597                }
1598                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1599                    // Reached the beginning of the turn.
1600                    // If we had pending permission requests in the previous turn, they have been cancelled.
1601                    break;
1602                }
1603            }
1604        }
1605
1606        first_tool_call
1607    }
1608
1609    pub fn plan(&self) -> &Plan {
1610        &self.plan
1611    }
1612
1613    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1614        let new_entries_len = request.entries.len();
1615        let mut new_entries = request.entries.into_iter();
1616
1617        // Reuse existing markdown to prevent flickering
1618        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1619            let PlanEntry {
1620                content,
1621                priority,
1622                status,
1623            } = old;
1624            content.update(cx, |old, cx| {
1625                old.replace(new.content, cx);
1626            });
1627            *priority = new.priority;
1628            *status = new.status;
1629        }
1630        for new in new_entries {
1631            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1632        }
1633        self.plan.entries.truncate(new_entries_len);
1634
1635        cx.notify();
1636    }
1637
1638    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1639        self.plan
1640            .entries
1641            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1642        cx.notify();
1643    }
1644
1645    #[cfg(any(test, feature = "test-support"))]
1646    pub fn send_raw(
1647        &mut self,
1648        message: &str,
1649        cx: &mut Context<Self>,
1650    ) -> BoxFuture<'static, Result<()>> {
1651        self.send(
1652            vec![acp::ContentBlock::Text(acp::TextContent {
1653                text: message.to_string(),
1654                annotations: None,
1655                meta: None,
1656            })],
1657            cx,
1658        )
1659    }
1660
1661    pub fn send(
1662        &mut self,
1663        message: Vec<acp::ContentBlock>,
1664        cx: &mut Context<Self>,
1665    ) -> BoxFuture<'static, Result<()>> {
1666        let block = ContentBlock::new_combined(
1667            message.clone(),
1668            self.project.read(cx).languages().clone(),
1669            self.project.read(cx).path_style(cx),
1670            cx,
1671        );
1672        let request = acp::PromptRequest {
1673            prompt: message.clone(),
1674            session_id: self.session_id.clone(),
1675            meta: None,
1676        };
1677        let git_store = self.project.read(cx).git_store().clone();
1678
1679        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1680            Some(UserMessageId::new())
1681        } else {
1682            None
1683        };
1684
1685        self.run_turn(cx, async move |this, cx| {
1686            this.update(cx, |this, cx| {
1687                this.push_entry(
1688                    AgentThreadEntry::UserMessage(UserMessage {
1689                        id: message_id.clone(),
1690                        content: block,
1691                        chunks: message,
1692                        checkpoint: None,
1693                    }),
1694                    cx,
1695                );
1696            })
1697            .ok();
1698
1699            let old_checkpoint = git_store
1700                .update(cx, |git, cx| git.checkpoint(cx))?
1701                .await
1702                .context("failed to get old checkpoint")
1703                .log_err();
1704            this.update(cx, |this, cx| {
1705                if let Some((_ix, message)) = this.last_user_message() {
1706                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1707                        git_checkpoint,
1708                        show: false,
1709                    });
1710                }
1711                this.connection.prompt(message_id, request, cx)
1712            })?
1713            .await
1714        })
1715    }
1716
1717    pub fn can_resume(&self, cx: &App) -> bool {
1718        self.connection.resume(&self.session_id, cx).is_some()
1719    }
1720
1721    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1722        self.run_turn(cx, async move |this, cx| {
1723            this.update(cx, |this, cx| {
1724                this.connection
1725                    .resume(&this.session_id, cx)
1726                    .map(|resume| resume.run(cx))
1727            })?
1728            .context("resuming a session is not supported")?
1729            .await
1730        })
1731    }
1732
1733    fn run_turn(
1734        &mut self,
1735        cx: &mut Context<Self>,
1736        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1737    ) -> BoxFuture<'static, Result<()>> {
1738        self.clear_completed_plan_entries(cx);
1739
1740        let (tx, rx) = oneshot::channel();
1741        let cancel_task = self.cancel(cx);
1742
1743        self.send_task = Some(cx.spawn(async move |this, cx| {
1744            cancel_task.await;
1745            tx.send(f(this, cx).await).ok();
1746        }));
1747
1748        cx.spawn(async move |this, cx| {
1749            let response = rx.await;
1750
1751            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1752                .await?;
1753
1754            this.update(cx, |this, cx| {
1755                this.project
1756                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1757                match response {
1758                    Ok(Err(e)) => {
1759                        this.send_task.take();
1760                        cx.emit(AcpThreadEvent::Error);
1761                        Err(e)
1762                    }
1763                    result => {
1764                        let canceled = matches!(
1765                            result,
1766                            Ok(Ok(acp::PromptResponse {
1767                                stop_reason: acp::StopReason::Cancelled,
1768                                meta: None,
1769                            }))
1770                        );
1771
1772                        // We only take the task if the current prompt wasn't canceled.
1773                        //
1774                        // This prompt may have been canceled because another one was sent
1775                        // while it was still generating. In these cases, dropping `send_task`
1776                        // would cause the next generation to be canceled.
1777                        if !canceled {
1778                            this.send_task.take();
1779                        }
1780
1781                        // Handle refusal - distinguish between user prompt and tool call refusals
1782                        if let Ok(Ok(acp::PromptResponse {
1783                            stop_reason: acp::StopReason::Refusal,
1784                            meta: _,
1785                        })) = result
1786                        {
1787                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1788                                // Check if there's a completed tool call with results after the last user message
1789                                // This indicates the refusal is in response to tool output, not the user's prompt
1790                                let has_completed_tool_call_after_user_msg =
1791                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1792                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1793                                            // Check if the tool call has completed and has output
1794                                            matches!(tool_call.status, ToolCallStatus::Completed)
1795                                                && tool_call.raw_output.is_some()
1796                                        } else {
1797                                            false
1798                                        }
1799                                    });
1800
1801                                if has_completed_tool_call_after_user_msg {
1802                                    // Refusal is due to tool output - don't truncate, just notify
1803                                    // The model refused based on what the tool returned
1804                                    cx.emit(AcpThreadEvent::Refusal);
1805                                } else {
1806                                    // User prompt was refused - truncate back to before the user message
1807                                    let range = user_msg_ix..this.entries.len();
1808                                    if range.start < range.end {
1809                                        this.entries.truncate(user_msg_ix);
1810                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1811                                    }
1812                                    cx.emit(AcpThreadEvent::Refusal);
1813                                }
1814                            } else {
1815                                // No user message found, treat as general refusal
1816                                cx.emit(AcpThreadEvent::Refusal);
1817                            }
1818                        }
1819
1820                        cx.emit(AcpThreadEvent::Stopped);
1821                        Ok(())
1822                    }
1823                }
1824            })?
1825        })
1826        .boxed()
1827    }
1828
1829    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1830        let Some(send_task) = self.send_task.take() else {
1831            return Task::ready(());
1832        };
1833
1834        for entry in self.entries.iter_mut() {
1835            if let AgentThreadEntry::ToolCall(call) = entry {
1836                let cancel = matches!(
1837                    call.status,
1838                    ToolCallStatus::Pending
1839                        | ToolCallStatus::WaitingForConfirmation { .. }
1840                        | ToolCallStatus::InProgress
1841                );
1842
1843                if cancel {
1844                    call.status = ToolCallStatus::Canceled;
1845                }
1846            }
1847        }
1848
1849        self.connection.cancel(&self.session_id, cx);
1850
1851        // Wait for the send task to complete
1852        cx.foreground_executor().spawn(send_task)
1853    }
1854
1855    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1856    pub fn restore_checkpoint(
1857        &mut self,
1858        id: UserMessageId,
1859        cx: &mut Context<Self>,
1860    ) -> Task<Result<()>> {
1861        let Some((_, message)) = self.user_message_mut(&id) else {
1862            return Task::ready(Err(anyhow!("message not found")));
1863        };
1864
1865        let checkpoint = message
1866            .checkpoint
1867            .as_ref()
1868            .map(|c| c.git_checkpoint.clone());
1869        let rewind = self.rewind(id.clone(), cx);
1870        let git_store = self.project.read(cx).git_store().clone();
1871
1872        cx.spawn(async move |_, cx| {
1873            rewind.await?;
1874            if let Some(checkpoint) = checkpoint {
1875                git_store
1876                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1877                    .await?;
1878            }
1879
1880            Ok(())
1881        })
1882    }
1883
1884    /// Rewinds this thread to before the entry at `index`, removing it and all
1885    /// subsequent entries while rejecting any action_log changes made from that point.
1886    /// Unlike `restore_checkpoint`, this method does not restore from git.
1887    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1888        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1889            return Task::ready(Err(anyhow!("not supported")));
1890        };
1891
1892        let telemetry = ActionLogTelemetry::from(&*self);
1893        cx.spawn(async move |this, cx| {
1894            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1895            this.update(cx, |this, cx| {
1896                if let Some((ix, _)) = this.user_message_mut(&id) {
1897                    let range = ix..this.entries.len();
1898                    this.entries.truncate(ix);
1899                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1900                }
1901                this.action_log().update(cx, |action_log, cx| {
1902                    action_log.reject_all_edits(Some(telemetry), cx)
1903                })
1904            })?
1905            .await;
1906            Ok(())
1907        })
1908    }
1909
1910    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1911        let git_store = self.project.read(cx).git_store().clone();
1912
1913        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1914            if let Some(checkpoint) = message.checkpoint.as_ref() {
1915                checkpoint.git_checkpoint.clone()
1916            } else {
1917                return Task::ready(Ok(()));
1918            }
1919        } else {
1920            return Task::ready(Ok(()));
1921        };
1922
1923        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1924        cx.spawn(async move |this, cx| {
1925            let new_checkpoint = new_checkpoint
1926                .await
1927                .context("failed to get new checkpoint")
1928                .log_err();
1929            if let Some(new_checkpoint) = new_checkpoint {
1930                let equal = git_store
1931                    .update(cx, |git, cx| {
1932                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1933                    })?
1934                    .await
1935                    .unwrap_or(true);
1936                this.update(cx, |this, cx| {
1937                    let (ix, message) = this.last_user_message().context("no user message")?;
1938                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1939                    checkpoint.show = !equal;
1940                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1941                    anyhow::Ok(())
1942                })??;
1943            }
1944
1945            Ok(())
1946        })
1947    }
1948
1949    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1950        self.entries
1951            .iter_mut()
1952            .enumerate()
1953            .rev()
1954            .find_map(|(ix, entry)| {
1955                if let AgentThreadEntry::UserMessage(message) = entry {
1956                    Some((ix, message))
1957                } else {
1958                    None
1959                }
1960            })
1961    }
1962
1963    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1964        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1965            if let AgentThreadEntry::UserMessage(message) = entry {
1966                if message.id.as_ref() == Some(id) {
1967                    Some((ix, message))
1968                } else {
1969                    None
1970                }
1971            } else {
1972                None
1973            }
1974        })
1975    }
1976
1977    pub fn read_text_file(
1978        &self,
1979        path: PathBuf,
1980        line: Option<u32>,
1981        limit: Option<u32>,
1982        reuse_shared_snapshot: bool,
1983        cx: &mut Context<Self>,
1984    ) -> Task<Result<String, acp::Error>> {
1985        // Args are 1-based, move to 0-based
1986        let line = line.unwrap_or_default().saturating_sub(1);
1987        let limit = limit.unwrap_or(u32::MAX);
1988        let project = self.project.clone();
1989        let action_log = self.action_log.clone();
1990        cx.spawn(async move |this, cx| {
1991            let load = project
1992                .update(cx, |project, cx| {
1993                    let path = project
1994                        .project_path_for_absolute_path(&path, cx)
1995                        .ok_or_else(|| {
1996                            acp::Error::resource_not_found(Some(path.display().to_string()))
1997                        })?;
1998                    Ok(project.open_buffer(path, cx))
1999                })
2000                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
2001                .flatten()?;
2002
2003            let buffer = load.await?;
2004
2005            let snapshot = if reuse_shared_snapshot {
2006                this.read_with(cx, |this, _| {
2007                    this.shared_buffers.get(&buffer.clone()).cloned()
2008                })
2009                .log_err()
2010                .flatten()
2011            } else {
2012                None
2013            };
2014
2015            let snapshot = if let Some(snapshot) = snapshot {
2016                snapshot
2017            } else {
2018                action_log.update(cx, |action_log, cx| {
2019                    action_log.buffer_read(buffer.clone(), cx);
2020                })?;
2021
2022                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2023                this.update(cx, |this, _| {
2024                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2025                })?;
2026                snapshot
2027            };
2028
2029            let max_point = snapshot.max_point();
2030            let start_position = Point::new(line, 0);
2031
2032            if start_position > max_point {
2033                return Err(acp::Error::invalid_params().with_data(format!(
2034                    "Attempting to read beyond the end of the file, line {}:{}",
2035                    max_point.row + 1,
2036                    max_point.column
2037                )));
2038            }
2039
2040            let start = snapshot.anchor_before(start_position);
2041            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2042
2043            project.update(cx, |project, cx| {
2044                project.set_agent_location(
2045                    Some(AgentLocation {
2046                        buffer: buffer.downgrade(),
2047                        position: start,
2048                    }),
2049                    cx,
2050                );
2051            })?;
2052
2053            Ok(snapshot.text_for_range(start..end).collect::<String>())
2054        })
2055    }
2056
2057    pub fn write_text_file(
2058        &self,
2059        path: PathBuf,
2060        content: String,
2061        cx: &mut Context<Self>,
2062    ) -> Task<Result<()>> {
2063        let project = self.project.clone();
2064        let action_log = self.action_log.clone();
2065        cx.spawn(async move |this, cx| {
2066            let load = project.update(cx, |project, cx| {
2067                let path = project
2068                    .project_path_for_absolute_path(&path, cx)
2069                    .context("invalid path")?;
2070                anyhow::Ok(project.open_buffer(path, cx))
2071            });
2072            let buffer = load??.await?;
2073            let snapshot = this.update(cx, |this, cx| {
2074                this.shared_buffers
2075                    .get(&buffer)
2076                    .cloned()
2077                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2078            })?;
2079            let edits = cx
2080                .background_executor()
2081                .spawn(async move {
2082                    let old_text = snapshot.text();
2083                    text_diff(old_text.as_str(), &content)
2084                        .into_iter()
2085                        .map(|(range, replacement)| {
2086                            (
2087                                snapshot.anchor_after(range.start)
2088                                    ..snapshot.anchor_before(range.end),
2089                                replacement,
2090                            )
2091                        })
2092                        .collect::<Vec<_>>()
2093                })
2094                .await;
2095
2096            project.update(cx, |project, cx| {
2097                project.set_agent_location(
2098                    Some(AgentLocation {
2099                        buffer: buffer.downgrade(),
2100                        position: edits
2101                            .last()
2102                            .map(|(range, _)| range.end)
2103                            .unwrap_or(Anchor::MIN),
2104                    }),
2105                    cx,
2106                );
2107            })?;
2108
2109            let format_on_save = cx.update(|cx| {
2110                action_log.update(cx, |action_log, cx| {
2111                    action_log.buffer_read(buffer.clone(), cx);
2112                });
2113
2114                let format_on_save = buffer.update(cx, |buffer, cx| {
2115                    buffer.edit(edits, None, cx);
2116
2117                    let settings = language::language_settings::language_settings(
2118                        buffer.language().map(|l| l.name()),
2119                        buffer.file(),
2120                        cx,
2121                    );
2122
2123                    settings.format_on_save != FormatOnSave::Off
2124                });
2125                action_log.update(cx, |action_log, cx| {
2126                    action_log.buffer_edited(buffer.clone(), cx);
2127                });
2128                format_on_save
2129            })?;
2130
2131            if format_on_save {
2132                let format_task = project.update(cx, |project, cx| {
2133                    project.format(
2134                        HashSet::from_iter([buffer.clone()]),
2135                        LspFormatTarget::Buffers,
2136                        false,
2137                        FormatTrigger::Save,
2138                        cx,
2139                    )
2140                })?;
2141                format_task.await.log_err();
2142
2143                action_log.update(cx, |action_log, cx| {
2144                    action_log.buffer_edited(buffer.clone(), cx);
2145                })?;
2146            }
2147
2148            project
2149                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2150                .await
2151        })
2152    }
2153
2154    pub fn create_terminal(
2155        &self,
2156        command: String,
2157        args: Vec<String>,
2158        extra_env: Vec<acp::EnvVariable>,
2159        cwd: Option<PathBuf>,
2160        output_byte_limit: Option<u64>,
2161        cx: &mut Context<Self>,
2162    ) -> Task<Result<Entity<Terminal>>> {
2163        let env = match &cwd {
2164            Some(dir) => self.project.update(cx, |project, cx| {
2165                project.environment().update(cx, |env, cx| {
2166                    env.directory_environment(dir.as_path().into(), cx)
2167                })
2168            }),
2169            None => Task::ready(None).shared(),
2170        };
2171        let env = cx.spawn(async move |_, _| {
2172            let mut env = env.await.unwrap_or_default();
2173            // Disables paging for `git` and hopefully other commands
2174            env.insert("PAGER".into(), "".into());
2175            for var in extra_env {
2176                env.insert(var.name, var.value);
2177            }
2178            env
2179        });
2180
2181        let project = self.project.clone();
2182        let language_registry = project.read(cx).languages().clone();
2183        let is_windows = project.read(cx).path_style(cx).is_windows();
2184
2185        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2186        let terminal_task = cx.spawn({
2187            let terminal_id = terminal_id.clone();
2188            async move |_this, cx| {
2189                let env = env.await;
2190                let shell = project
2191                    .update(cx, |project, cx| {
2192                        project
2193                            .remote_client()
2194                            .and_then(|r| r.read(cx).default_system_shell())
2195                    })?
2196                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2197                let (task_command, task_args) =
2198                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2199                        .redirect_stdin_to_dev_null()
2200                        .build(Some(command.clone()), &args);
2201                let terminal = project
2202                    .update(cx, |project, cx| {
2203                        project.create_terminal_task(
2204                            task::SpawnInTerminal {
2205                                command: Some(task_command),
2206                                args: task_args,
2207                                cwd: cwd.clone(),
2208                                env,
2209                                ..Default::default()
2210                            },
2211                            cx,
2212                        )
2213                    })?
2214                    .await?;
2215
2216                cx.new(|cx| {
2217                    Terminal::new(
2218                        terminal_id,
2219                        &format!("{} {}", command, args.join(" ")),
2220                        cwd,
2221                        output_byte_limit.map(|l| l as usize),
2222                        terminal,
2223                        language_registry,
2224                        cx,
2225                    )
2226                })
2227            }
2228        });
2229
2230        cx.spawn(async move |this, cx| {
2231            let terminal = terminal_task.await?;
2232            this.update(cx, |this, _cx| {
2233                this.terminals.insert(terminal_id, terminal.clone());
2234                terminal
2235            })
2236        })
2237    }
2238
2239    pub fn kill_terminal(
2240        &mut self,
2241        terminal_id: acp::TerminalId,
2242        cx: &mut Context<Self>,
2243    ) -> Result<()> {
2244        self.terminals
2245            .get(&terminal_id)
2246            .context("Terminal not found")?
2247            .update(cx, |terminal, cx| {
2248                terminal.kill(cx);
2249            });
2250
2251        Ok(())
2252    }
2253
2254    pub fn release_terminal(
2255        &mut self,
2256        terminal_id: acp::TerminalId,
2257        cx: &mut Context<Self>,
2258    ) -> Result<()> {
2259        self.terminals
2260            .remove(&terminal_id)
2261            .context("Terminal not found")?
2262            .update(cx, |terminal, cx| {
2263                terminal.kill(cx);
2264            });
2265
2266        Ok(())
2267    }
2268
2269    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2270        self.terminals
2271            .get(&terminal_id)
2272            .context("Terminal not found")
2273            .cloned()
2274    }
2275
2276    pub fn to_markdown(&self, cx: &App) -> String {
2277        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2278    }
2279
2280    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2281        cx.emit(AcpThreadEvent::LoadError(error));
2282    }
2283
2284    pub fn register_terminal_created(
2285        &mut self,
2286        terminal_id: acp::TerminalId,
2287        command_label: String,
2288        working_dir: Option<PathBuf>,
2289        output_byte_limit: Option<u64>,
2290        terminal: Entity<::terminal::Terminal>,
2291        cx: &mut Context<Self>,
2292    ) -> Entity<Terminal> {
2293        let language_registry = self.project.read(cx).languages().clone();
2294
2295        let entity = cx.new(|cx| {
2296            Terminal::new(
2297                terminal_id.clone(),
2298                &command_label,
2299                working_dir.clone(),
2300                output_byte_limit.map(|l| l as usize),
2301                terminal,
2302                language_registry,
2303                cx,
2304            )
2305        });
2306        self.terminals.insert(terminal_id.clone(), entity.clone());
2307        entity
2308    }
2309}
2310
2311fn markdown_for_raw_output(
2312    raw_output: &serde_json::Value,
2313    language_registry: &Arc<LanguageRegistry>,
2314    cx: &mut App,
2315) -> Option<Entity<Markdown>> {
2316    match raw_output {
2317        serde_json::Value::Null => None,
2318        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2319            Markdown::new(
2320                value.to_string().into(),
2321                Some(language_registry.clone()),
2322                None,
2323                cx,
2324            )
2325        })),
2326        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2327            Markdown::new(
2328                value.to_string().into(),
2329                Some(language_registry.clone()),
2330                None,
2331                cx,
2332            )
2333        })),
2334        serde_json::Value::String(value) => Some(cx.new(|cx| {
2335            Markdown::new(
2336                value.clone().into(),
2337                Some(language_registry.clone()),
2338                None,
2339                cx,
2340            )
2341        })),
2342        value => Some(cx.new(|cx| {
2343            Markdown::new(
2344                format!("```json\n{}\n```", value).into(),
2345                Some(language_registry.clone()),
2346                None,
2347                cx,
2348            )
2349        })),
2350    }
2351}
2352
2353#[cfg(test)]
2354mod tests {
2355    use super::*;
2356    use anyhow::anyhow;
2357    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2358    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2359    use indoc::indoc;
2360    use project::{FakeFs, Fs};
2361    use rand::{distr, prelude::*};
2362    use serde_json::json;
2363    use settings::SettingsStore;
2364    use smol::stream::StreamExt as _;
2365    use std::{
2366        any::Any,
2367        cell::RefCell,
2368        path::Path,
2369        rc::Rc,
2370        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2371        time::Duration,
2372    };
2373    use util::path;
2374
2375    fn init_test(cx: &mut TestAppContext) {
2376        env_logger::try_init().ok();
2377        cx.update(|cx| {
2378            let settings_store = SettingsStore::test(cx);
2379            cx.set_global(settings_store);
2380            Project::init_settings(cx);
2381            language::init(cx);
2382        });
2383    }
2384
2385    #[gpui::test]
2386    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2387        init_test(cx);
2388
2389        let fs = FakeFs::new(cx.executor());
2390        let project = Project::test(fs, [], cx).await;
2391        let connection = Rc::new(FakeAgentConnection::new());
2392        let thread = cx
2393            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2394            .await
2395            .unwrap();
2396
2397        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2398
2399        // Send Output BEFORE Created - should be buffered by acp_thread
2400        thread.update(cx, |thread, cx| {
2401            thread.on_terminal_provider_event(
2402                TerminalProviderEvent::Output {
2403                    terminal_id: terminal_id.clone(),
2404                    data: b"hello buffered".to_vec(),
2405                },
2406                cx,
2407            );
2408        });
2409
2410        // Create a display-only terminal and then send Created
2411        let lower = cx.new(|cx| {
2412            let builder = ::terminal::TerminalBuilder::new_display_only(
2413                ::terminal::terminal_settings::CursorShape::default(),
2414                ::terminal::terminal_settings::AlternateScroll::On,
2415                None,
2416                0,
2417            )
2418            .unwrap();
2419            builder.subscribe(cx)
2420        });
2421
2422        thread.update(cx, |thread, cx| {
2423            thread.on_terminal_provider_event(
2424                TerminalProviderEvent::Created {
2425                    terminal_id: terminal_id.clone(),
2426                    label: "Buffered Test".to_string(),
2427                    cwd: None,
2428                    output_byte_limit: None,
2429                    terminal: lower.clone(),
2430                },
2431                cx,
2432            );
2433        });
2434
2435        // After Created, buffered Output should have been flushed into the renderer
2436        let content = thread.read_with(cx, |thread, cx| {
2437            let term = thread.terminal(terminal_id.clone()).unwrap();
2438            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2439        });
2440
2441        assert!(
2442            content.contains("hello buffered"),
2443            "expected buffered output to render, got: {content}"
2444        );
2445    }
2446
2447    #[gpui::test]
2448    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2449        init_test(cx);
2450
2451        let fs = FakeFs::new(cx.executor());
2452        let project = Project::test(fs, [], cx).await;
2453        let connection = Rc::new(FakeAgentConnection::new());
2454        let thread = cx
2455            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2456            .await
2457            .unwrap();
2458
2459        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2460
2461        // Send Output BEFORE Created
2462        thread.update(cx, |thread, cx| {
2463            thread.on_terminal_provider_event(
2464                TerminalProviderEvent::Output {
2465                    terminal_id: terminal_id.clone(),
2466                    data: b"pre-exit data".to_vec(),
2467                },
2468                cx,
2469            );
2470        });
2471
2472        // Send Exit BEFORE Created
2473        thread.update(cx, |thread, cx| {
2474            thread.on_terminal_provider_event(
2475                TerminalProviderEvent::Exit {
2476                    terminal_id: terminal_id.clone(),
2477                    status: acp::TerminalExitStatus {
2478                        exit_code: Some(0),
2479                        signal: None,
2480                        meta: None,
2481                    },
2482                },
2483                cx,
2484            );
2485        });
2486
2487        // Now create a display-only lower-level terminal and send Created
2488        let lower = cx.new(|cx| {
2489            let builder = ::terminal::TerminalBuilder::new_display_only(
2490                ::terminal::terminal_settings::CursorShape::default(),
2491                ::terminal::terminal_settings::AlternateScroll::On,
2492                None,
2493                0,
2494            )
2495            .unwrap();
2496            builder.subscribe(cx)
2497        });
2498
2499        thread.update(cx, |thread, cx| {
2500            thread.on_terminal_provider_event(
2501                TerminalProviderEvent::Created {
2502                    terminal_id: terminal_id.clone(),
2503                    label: "Buffered Exit Test".to_string(),
2504                    cwd: None,
2505                    output_byte_limit: None,
2506                    terminal: lower.clone(),
2507                },
2508                cx,
2509            );
2510        });
2511
2512        // Output should be present after Created (flushed from buffer)
2513        let content = thread.read_with(cx, |thread, cx| {
2514            let term = thread.terminal(terminal_id.clone()).unwrap();
2515            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2516        });
2517
2518        assert!(
2519            content.contains("pre-exit data"),
2520            "expected pre-exit data to render, got: {content}"
2521        );
2522    }
2523
2524    #[gpui::test]
2525    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2526        init_test(cx);
2527
2528        let fs = FakeFs::new(cx.executor());
2529        let project = Project::test(fs, [], cx).await;
2530        let connection = Rc::new(FakeAgentConnection::new());
2531        let thread = cx
2532            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2533            .await
2534            .unwrap();
2535
2536        // Test creating a new user message
2537        thread.update(cx, |thread, cx| {
2538            thread.push_user_content_block(
2539                None,
2540                acp::ContentBlock::Text(acp::TextContent {
2541                    annotations: None,
2542                    text: "Hello, ".to_string(),
2543                    meta: None,
2544                }),
2545                cx,
2546            );
2547        });
2548
2549        thread.update(cx, |thread, cx| {
2550            assert_eq!(thread.entries.len(), 1);
2551            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2552                assert_eq!(user_msg.id, None);
2553                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2554            } else {
2555                panic!("Expected UserMessage");
2556            }
2557        });
2558
2559        // Test appending to existing user message
2560        let message_1_id = UserMessageId::new();
2561        thread.update(cx, |thread, cx| {
2562            thread.push_user_content_block(
2563                Some(message_1_id.clone()),
2564                acp::ContentBlock::Text(acp::TextContent {
2565                    annotations: None,
2566                    text: "world!".to_string(),
2567                    meta: None,
2568                }),
2569                cx,
2570            );
2571        });
2572
2573        thread.update(cx, |thread, cx| {
2574            assert_eq!(thread.entries.len(), 1);
2575            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2576                assert_eq!(user_msg.id, Some(message_1_id));
2577                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2578            } else {
2579                panic!("Expected UserMessage");
2580            }
2581        });
2582
2583        // Test creating new user message after assistant message
2584        thread.update(cx, |thread, cx| {
2585            thread.push_assistant_content_block(
2586                acp::ContentBlock::Text(acp::TextContent {
2587                    annotations: None,
2588                    text: "Assistant response".to_string(),
2589                    meta: None,
2590                }),
2591                false,
2592                cx,
2593            );
2594        });
2595
2596        let message_2_id = UserMessageId::new();
2597        thread.update(cx, |thread, cx| {
2598            thread.push_user_content_block(
2599                Some(message_2_id.clone()),
2600                acp::ContentBlock::Text(acp::TextContent {
2601                    annotations: None,
2602                    text: "New user message".to_string(),
2603                    meta: None,
2604                }),
2605                cx,
2606            );
2607        });
2608
2609        thread.update(cx, |thread, cx| {
2610            assert_eq!(thread.entries.len(), 3);
2611            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2612                assert_eq!(user_msg.id, Some(message_2_id));
2613                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2614            } else {
2615                panic!("Expected UserMessage at index 2");
2616            }
2617        });
2618    }
2619
2620    #[gpui::test]
2621    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2622        init_test(cx);
2623
2624        let fs = FakeFs::new(cx.executor());
2625        let project = Project::test(fs, [], cx).await;
2626        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2627            |_, thread, mut cx| {
2628                async move {
2629                    thread.update(&mut cx, |thread, cx| {
2630                        thread
2631                            .handle_session_update(
2632                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2633                                    content: "Thinking ".into(),
2634                                    meta: None,
2635                                }),
2636                                cx,
2637                            )
2638                            .unwrap();
2639                        thread
2640                            .handle_session_update(
2641                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2642                                    content: "hard!".into(),
2643                                    meta: None,
2644                                }),
2645                                cx,
2646                            )
2647                            .unwrap();
2648                    })?;
2649                    Ok(acp::PromptResponse {
2650                        stop_reason: acp::StopReason::EndTurn,
2651                        meta: None,
2652                    })
2653                }
2654                .boxed_local()
2655            },
2656        ));
2657
2658        let thread = cx
2659            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2660            .await
2661            .unwrap();
2662
2663        thread
2664            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2665            .await
2666            .unwrap();
2667
2668        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2669        assert_eq!(
2670            output,
2671            indoc! {r#"
2672            ## User
2673
2674            Hello from Zed!
2675
2676            ## Assistant
2677
2678            <thinking>
2679            Thinking hard!
2680            </thinking>
2681
2682            "#}
2683        );
2684    }
2685
2686    #[gpui::test]
2687    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2688        init_test(cx);
2689
2690        let fs = FakeFs::new(cx.executor());
2691        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2692            .await;
2693        let project = Project::test(fs.clone(), [], cx).await;
2694        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2695        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2696        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2697            move |_, thread, mut cx| {
2698                let read_file_tx = read_file_tx.clone();
2699                async move {
2700                    let content = thread
2701                        .update(&mut cx, |thread, cx| {
2702                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2703                        })
2704                        .unwrap()
2705                        .await
2706                        .unwrap();
2707                    assert_eq!(content, "one\ntwo\nthree\n");
2708                    read_file_tx.take().unwrap().send(()).unwrap();
2709                    thread
2710                        .update(&mut cx, |thread, cx| {
2711                            thread.write_text_file(
2712                                path!("/tmp/foo").into(),
2713                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2714                                cx,
2715                            )
2716                        })
2717                        .unwrap()
2718                        .await
2719                        .unwrap();
2720                    Ok(acp::PromptResponse {
2721                        stop_reason: acp::StopReason::EndTurn,
2722                        meta: None,
2723                    })
2724                }
2725                .boxed_local()
2726            },
2727        ));
2728
2729        let (worktree, pathbuf) = project
2730            .update(cx, |project, cx| {
2731                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2732            })
2733            .await
2734            .unwrap();
2735        let buffer = project
2736            .update(cx, |project, cx| {
2737                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2738            })
2739            .await
2740            .unwrap();
2741
2742        let thread = cx
2743            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2744            .await
2745            .unwrap();
2746
2747        let request = thread.update(cx, |thread, cx| {
2748            thread.send_raw("Extend the count in /tmp/foo", cx)
2749        });
2750        read_file_rx.await.ok();
2751        buffer.update(cx, |buffer, cx| {
2752            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2753        });
2754        cx.run_until_parked();
2755        assert_eq!(
2756            buffer.read_with(cx, |buffer, _| buffer.text()),
2757            "zero\none\ntwo\nthree\nfour\nfive\n"
2758        );
2759        assert_eq!(
2760            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2761            "zero\none\ntwo\nthree\nfour\nfive\n"
2762        );
2763        request.await.unwrap();
2764    }
2765
2766    #[gpui::test]
2767    async fn test_reading_from_line(cx: &mut TestAppContext) {
2768        init_test(cx);
2769
2770        let fs = FakeFs::new(cx.executor());
2771        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2772            .await;
2773        let project = Project::test(fs.clone(), [], cx).await;
2774        project
2775            .update(cx, |project, cx| {
2776                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2777            })
2778            .await
2779            .unwrap();
2780
2781        let connection = Rc::new(FakeAgentConnection::new());
2782
2783        let thread = cx
2784            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2785            .await
2786            .unwrap();
2787
2788        // Whole file
2789        let content = thread
2790            .update(cx, |thread, cx| {
2791                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2792            })
2793            .await
2794            .unwrap();
2795
2796        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2797
2798        // Only start line
2799        let content = thread
2800            .update(cx, |thread, cx| {
2801                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2802            })
2803            .await
2804            .unwrap();
2805
2806        assert_eq!(content, "three\nfour\n");
2807
2808        // Only limit
2809        let content = thread
2810            .update(cx, |thread, cx| {
2811                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2812            })
2813            .await
2814            .unwrap();
2815
2816        assert_eq!(content, "one\ntwo\n");
2817
2818        // Range
2819        let content = thread
2820            .update(cx, |thread, cx| {
2821                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2822            })
2823            .await
2824            .unwrap();
2825
2826        assert_eq!(content, "two\nthree\n");
2827
2828        // Invalid
2829        let err = thread
2830            .update(cx, |thread, cx| {
2831                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2832            })
2833            .await
2834            .unwrap_err();
2835
2836        assert_eq!(
2837            err.to_string(),
2838            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2839        );
2840    }
2841
2842    #[gpui::test]
2843    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2844        init_test(cx);
2845
2846        let fs = FakeFs::new(cx.executor());
2847        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2848        let project = Project::test(fs.clone(), [], cx).await;
2849        project
2850            .update(cx, |project, cx| {
2851                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2852            })
2853            .await
2854            .unwrap();
2855
2856        let connection = Rc::new(FakeAgentConnection::new());
2857
2858        let thread = cx
2859            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2860            .await
2861            .unwrap();
2862
2863        // Whole file
2864        let content = thread
2865            .update(cx, |thread, cx| {
2866                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2867            })
2868            .await
2869            .unwrap();
2870
2871        assert_eq!(content, "");
2872
2873        // Only start line
2874        let content = thread
2875            .update(cx, |thread, cx| {
2876                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2877            })
2878            .await
2879            .unwrap();
2880
2881        assert_eq!(content, "");
2882
2883        // Only limit
2884        let content = thread
2885            .update(cx, |thread, cx| {
2886                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2887            })
2888            .await
2889            .unwrap();
2890
2891        assert_eq!(content, "");
2892
2893        // Range
2894        let content = thread
2895            .update(cx, |thread, cx| {
2896                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2897            })
2898            .await
2899            .unwrap();
2900
2901        assert_eq!(content, "");
2902
2903        // Invalid
2904        let err = thread
2905            .update(cx, |thread, cx| {
2906                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2907            })
2908            .await
2909            .unwrap_err();
2910
2911        assert_eq!(
2912            err.to_string(),
2913            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2914        );
2915    }
2916    #[gpui::test]
2917    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2918        init_test(cx);
2919
2920        let fs = FakeFs::new(cx.executor());
2921        fs.insert_tree(path!("/tmp"), json!({})).await;
2922        let project = Project::test(fs.clone(), [], cx).await;
2923        project
2924            .update(cx, |project, cx| {
2925                project.find_or_create_worktree(path!("/tmp"), true, cx)
2926            })
2927            .await
2928            .unwrap();
2929
2930        let connection = Rc::new(FakeAgentConnection::new());
2931
2932        let thread = cx
2933            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2934            .await
2935            .unwrap();
2936
2937        // Out of project file
2938        let err = thread
2939            .update(cx, |thread, cx| {
2940                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2941            })
2942            .await
2943            .unwrap_err();
2944
2945        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2946    }
2947
2948    #[gpui::test]
2949    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2950        init_test(cx);
2951
2952        let fs = FakeFs::new(cx.executor());
2953        let project = Project::test(fs, [], cx).await;
2954        let id = acp::ToolCallId("test".into());
2955
2956        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2957            let id = id.clone();
2958            move |_, thread, mut cx| {
2959                let id = id.clone();
2960                async move {
2961                    thread
2962                        .update(&mut cx, |thread, cx| {
2963                            thread.handle_session_update(
2964                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2965                                    id: id.clone(),
2966                                    title: "Label".into(),
2967                                    kind: acp::ToolKind::Fetch,
2968                                    status: acp::ToolCallStatus::InProgress,
2969                                    content: vec![],
2970                                    locations: vec![],
2971                                    raw_input: None,
2972                                    raw_output: None,
2973                                    meta: None,
2974                                }),
2975                                cx,
2976                            )
2977                        })
2978                        .unwrap()
2979                        .unwrap();
2980                    Ok(acp::PromptResponse {
2981                        stop_reason: acp::StopReason::EndTurn,
2982                        meta: None,
2983                    })
2984                }
2985                .boxed_local()
2986            }
2987        }));
2988
2989        let thread = cx
2990            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2991            .await
2992            .unwrap();
2993
2994        let request = thread.update(cx, |thread, cx| {
2995            thread.send_raw("Fetch https://example.com", cx)
2996        });
2997
2998        run_until_first_tool_call(&thread, cx).await;
2999
3000        thread.read_with(cx, |thread, _| {
3001            assert!(matches!(
3002                thread.entries[1],
3003                AgentThreadEntry::ToolCall(ToolCall {
3004                    status: ToolCallStatus::InProgress,
3005                    ..
3006                })
3007            ));
3008        });
3009
3010        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3011
3012        thread.read_with(cx, |thread, _| {
3013            assert!(matches!(
3014                &thread.entries[1],
3015                AgentThreadEntry::ToolCall(ToolCall {
3016                    status: ToolCallStatus::Canceled,
3017                    ..
3018                })
3019            ));
3020        });
3021
3022        thread
3023            .update(cx, |thread, cx| {
3024                thread.handle_session_update(
3025                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3026                        id,
3027                        fields: acp::ToolCallUpdateFields {
3028                            status: Some(acp::ToolCallStatus::Completed),
3029                            ..Default::default()
3030                        },
3031                        meta: None,
3032                    }),
3033                    cx,
3034                )
3035            })
3036            .unwrap();
3037
3038        request.await.unwrap();
3039
3040        thread.read_with(cx, |thread, _| {
3041            assert!(matches!(
3042                thread.entries[1],
3043                AgentThreadEntry::ToolCall(ToolCall {
3044                    status: ToolCallStatus::Completed,
3045                    ..
3046                })
3047            ));
3048        });
3049    }
3050
3051    #[gpui::test]
3052    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3053        init_test(cx);
3054        let fs = FakeFs::new(cx.background_executor.clone());
3055        fs.insert_tree(path!("/test"), json!({})).await;
3056        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3057
3058        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3059            move |_, thread, mut cx| {
3060                async move {
3061                    thread
3062                        .update(&mut cx, |thread, cx| {
3063                            thread.handle_session_update(
3064                                acp::SessionUpdate::ToolCall(acp::ToolCall {
3065                                    id: acp::ToolCallId("test".into()),
3066                                    title: "Label".into(),
3067                                    kind: acp::ToolKind::Edit,
3068                                    status: acp::ToolCallStatus::Completed,
3069                                    content: vec![acp::ToolCallContent::Diff {
3070                                        diff: acp::Diff {
3071                                            path: "/test/test.txt".into(),
3072                                            old_text: None,
3073                                            new_text: "foo".into(),
3074                                            meta: None,
3075                                        },
3076                                    }],
3077                                    locations: vec![],
3078                                    raw_input: None,
3079                                    raw_output: None,
3080                                    meta: None,
3081                                }),
3082                                cx,
3083                            )
3084                        })
3085                        .unwrap()
3086                        .unwrap();
3087                    Ok(acp::PromptResponse {
3088                        stop_reason: acp::StopReason::EndTurn,
3089                        meta: None,
3090                    })
3091                }
3092                .boxed_local()
3093            }
3094        }));
3095
3096        let thread = cx
3097            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3098            .await
3099            .unwrap();
3100
3101        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3102            .await
3103            .unwrap();
3104
3105        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3106    }
3107
3108    #[gpui::test(iterations = 10)]
3109    async fn test_checkpoints(cx: &mut TestAppContext) {
3110        init_test(cx);
3111        let fs = FakeFs::new(cx.background_executor.clone());
3112        fs.insert_tree(
3113            path!("/test"),
3114            json!({
3115                ".git": {}
3116            }),
3117        )
3118        .await;
3119        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3120
3121        let simulate_changes = Arc::new(AtomicBool::new(true));
3122        let next_filename = Arc::new(AtomicUsize::new(0));
3123        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3124            let simulate_changes = simulate_changes.clone();
3125            let next_filename = next_filename.clone();
3126            let fs = fs.clone();
3127            move |request, thread, mut cx| {
3128                let fs = fs.clone();
3129                let simulate_changes = simulate_changes.clone();
3130                let next_filename = next_filename.clone();
3131                async move {
3132                    if simulate_changes.load(SeqCst) {
3133                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3134                        fs.write(Path::new(&filename), b"").await?;
3135                    }
3136
3137                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3138                        panic!("expected text content block");
3139                    };
3140                    thread.update(&mut cx, |thread, cx| {
3141                        thread
3142                            .handle_session_update(
3143                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3144                                    content: content.text.to_uppercase().into(),
3145                                    meta: None,
3146                                }),
3147                                cx,
3148                            )
3149                            .unwrap();
3150                    })?;
3151                    Ok(acp::PromptResponse {
3152                        stop_reason: acp::StopReason::EndTurn,
3153                        meta: None,
3154                    })
3155                }
3156                .boxed_local()
3157            }
3158        }));
3159        let thread = cx
3160            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3161            .await
3162            .unwrap();
3163
3164        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3165            .await
3166            .unwrap();
3167        thread.read_with(cx, |thread, cx| {
3168            assert_eq!(
3169                thread.to_markdown(cx),
3170                indoc! {"
3171                    ## User (checkpoint)
3172
3173                    Lorem
3174
3175                    ## Assistant
3176
3177                    LOREM
3178
3179                "}
3180            );
3181        });
3182        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3183
3184        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3185            .await
3186            .unwrap();
3187        thread.read_with(cx, |thread, cx| {
3188            assert_eq!(
3189                thread.to_markdown(cx),
3190                indoc! {"
3191                    ## User (checkpoint)
3192
3193                    Lorem
3194
3195                    ## Assistant
3196
3197                    LOREM
3198
3199                    ## User (checkpoint)
3200
3201                    ipsum
3202
3203                    ## Assistant
3204
3205                    IPSUM
3206
3207                "}
3208            );
3209        });
3210        assert_eq!(
3211            fs.files(),
3212            vec![
3213                Path::new(path!("/test/file-0")),
3214                Path::new(path!("/test/file-1"))
3215            ]
3216        );
3217
3218        // Checkpoint isn't stored when there are no changes.
3219        simulate_changes.store(false, SeqCst);
3220        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3221            .await
3222            .unwrap();
3223        thread.read_with(cx, |thread, cx| {
3224            assert_eq!(
3225                thread.to_markdown(cx),
3226                indoc! {"
3227                    ## User (checkpoint)
3228
3229                    Lorem
3230
3231                    ## Assistant
3232
3233                    LOREM
3234
3235                    ## User (checkpoint)
3236
3237                    ipsum
3238
3239                    ## Assistant
3240
3241                    IPSUM
3242
3243                    ## User
3244
3245                    dolor
3246
3247                    ## Assistant
3248
3249                    DOLOR
3250
3251                "}
3252            );
3253        });
3254        assert_eq!(
3255            fs.files(),
3256            vec![
3257                Path::new(path!("/test/file-0")),
3258                Path::new(path!("/test/file-1"))
3259            ]
3260        );
3261
3262        // Rewinding the conversation truncates the history and restores the checkpoint.
3263        thread
3264            .update(cx, |thread, cx| {
3265                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3266                    panic!("unexpected entries {:?}", thread.entries)
3267                };
3268                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3269            })
3270            .await
3271            .unwrap();
3272        thread.read_with(cx, |thread, cx| {
3273            assert_eq!(
3274                thread.to_markdown(cx),
3275                indoc! {"
3276                    ## User (checkpoint)
3277
3278                    Lorem
3279
3280                    ## Assistant
3281
3282                    LOREM
3283
3284                "}
3285            );
3286        });
3287        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3288    }
3289
3290    #[gpui::test]
3291    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3292        use std::sync::atomic::AtomicUsize;
3293        init_test(cx);
3294
3295        let fs = FakeFs::new(cx.executor());
3296        let project = Project::test(fs, None, cx).await;
3297
3298        // Create a connection that simulates refusal after tool result
3299        let prompt_count = Arc::new(AtomicUsize::new(0));
3300        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3301            let prompt_count = prompt_count.clone();
3302            move |_request, thread, mut cx| {
3303                let count = prompt_count.fetch_add(1, SeqCst);
3304                async move {
3305                    if count == 0 {
3306                        // First prompt: Generate a tool call with result
3307                        thread.update(&mut cx, |thread, cx| {
3308                            thread
3309                                .handle_session_update(
3310                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3311                                        id: acp::ToolCallId("tool1".into()),
3312                                        title: "Test Tool".into(),
3313                                        kind: acp::ToolKind::Fetch,
3314                                        status: acp::ToolCallStatus::Completed,
3315                                        content: vec![],
3316                                        locations: vec![],
3317                                        raw_input: Some(serde_json::json!({"query": "test"})),
3318                                        raw_output: Some(
3319                                            serde_json::json!({"result": "inappropriate content"}),
3320                                        ),
3321                                        meta: None,
3322                                    }),
3323                                    cx,
3324                                )
3325                                .unwrap();
3326                        })?;
3327
3328                        // Now return refusal because of the tool result
3329                        Ok(acp::PromptResponse {
3330                            stop_reason: acp::StopReason::Refusal,
3331                            meta: None,
3332                        })
3333                    } else {
3334                        Ok(acp::PromptResponse {
3335                            stop_reason: acp::StopReason::EndTurn,
3336                            meta: None,
3337                        })
3338                    }
3339                }
3340                .boxed_local()
3341            }
3342        }));
3343
3344        let thread = cx
3345            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3346            .await
3347            .unwrap();
3348
3349        // Track if we see a Refusal event
3350        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3351        let saw_refusal_event_captured = saw_refusal_event.clone();
3352        thread.update(cx, |_thread, cx| {
3353            cx.subscribe(
3354                &thread,
3355                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3356                    if matches!(event, AcpThreadEvent::Refusal) {
3357                        *saw_refusal_event_captured.lock().unwrap() = true;
3358                    }
3359                },
3360            )
3361            .detach();
3362        });
3363
3364        // Send a user message - this will trigger tool call and then refusal
3365        let send_task = thread.update(cx, |thread, cx| {
3366            thread.send(
3367                vec![acp::ContentBlock::Text(acp::TextContent {
3368                    text: "Hello".into(),
3369                    annotations: None,
3370                    meta: None,
3371                })],
3372                cx,
3373            )
3374        });
3375        cx.background_executor.spawn(send_task).detach();
3376        cx.run_until_parked();
3377
3378        // Verify that:
3379        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3380        // 2. The user message was NOT truncated
3381        assert!(
3382            *saw_refusal_event.lock().unwrap(),
3383            "Refusal event should be emitted for tool result refusals"
3384        );
3385
3386        thread.read_with(cx, |thread, _| {
3387            let entries = thread.entries();
3388            assert!(entries.len() >= 2, "Should have user message and tool call");
3389
3390            // Verify user message is still there
3391            assert!(
3392                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3393                "User message should not be truncated"
3394            );
3395
3396            // Verify tool call is there with result
3397            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3398                assert!(
3399                    tool_call.raw_output.is_some(),
3400                    "Tool call should have output"
3401                );
3402            } else {
3403                panic!("Expected tool call at index 1");
3404            }
3405        });
3406    }
3407
3408    #[gpui::test]
3409    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3410        init_test(cx);
3411
3412        let fs = FakeFs::new(cx.executor());
3413        let project = Project::test(fs, None, cx).await;
3414
3415        let refuse_next = Arc::new(AtomicBool::new(false));
3416        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3417            let refuse_next = refuse_next.clone();
3418            move |_request, _thread, _cx| {
3419                if refuse_next.load(SeqCst) {
3420                    async move {
3421                        Ok(acp::PromptResponse {
3422                            stop_reason: acp::StopReason::Refusal,
3423                            meta: None,
3424                        })
3425                    }
3426                    .boxed_local()
3427                } else {
3428                    async move {
3429                        Ok(acp::PromptResponse {
3430                            stop_reason: acp::StopReason::EndTurn,
3431                            meta: None,
3432                        })
3433                    }
3434                    .boxed_local()
3435                }
3436            }
3437        }));
3438
3439        let thread = cx
3440            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3441            .await
3442            .unwrap();
3443
3444        // Track if we see a Refusal event
3445        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3446        let saw_refusal_event_captured = saw_refusal_event.clone();
3447        thread.update(cx, |_thread, cx| {
3448            cx.subscribe(
3449                &thread,
3450                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3451                    if matches!(event, AcpThreadEvent::Refusal) {
3452                        *saw_refusal_event_captured.lock().unwrap() = true;
3453                    }
3454                },
3455            )
3456            .detach();
3457        });
3458
3459        // Send a message that will be refused
3460        refuse_next.store(true, SeqCst);
3461        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3462            .await
3463            .unwrap();
3464
3465        // Verify that a Refusal event WAS emitted for user prompt refusal
3466        assert!(
3467            *saw_refusal_event.lock().unwrap(),
3468            "Refusal event should be emitted for user prompt refusals"
3469        );
3470
3471        // Verify the message was truncated (user prompt refusal)
3472        thread.read_with(cx, |thread, cx| {
3473            assert_eq!(thread.to_markdown(cx), "");
3474        });
3475    }
3476
3477    #[gpui::test]
3478    async fn test_refusal(cx: &mut TestAppContext) {
3479        init_test(cx);
3480        let fs = FakeFs::new(cx.background_executor.clone());
3481        fs.insert_tree(path!("/"), json!({})).await;
3482        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3483
3484        let refuse_next = Arc::new(AtomicBool::new(false));
3485        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3486            let refuse_next = refuse_next.clone();
3487            move |request, thread, mut cx| {
3488                let refuse_next = refuse_next.clone();
3489                async move {
3490                    if refuse_next.load(SeqCst) {
3491                        return Ok(acp::PromptResponse {
3492                            stop_reason: acp::StopReason::Refusal,
3493                            meta: None,
3494                        });
3495                    }
3496
3497                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3498                        panic!("expected text content block");
3499                    };
3500                    thread.update(&mut cx, |thread, cx| {
3501                        thread
3502                            .handle_session_update(
3503                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3504                                    content: content.text.to_uppercase().into(),
3505                                    meta: None,
3506                                }),
3507                                cx,
3508                            )
3509                            .unwrap();
3510                    })?;
3511                    Ok(acp::PromptResponse {
3512                        stop_reason: acp::StopReason::EndTurn,
3513                        meta: None,
3514                    })
3515                }
3516                .boxed_local()
3517            }
3518        }));
3519        let thread = cx
3520            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3521            .await
3522            .unwrap();
3523
3524        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3525            .await
3526            .unwrap();
3527        thread.read_with(cx, |thread, cx| {
3528            assert_eq!(
3529                thread.to_markdown(cx),
3530                indoc! {"
3531                    ## User
3532
3533                    hello
3534
3535                    ## Assistant
3536
3537                    HELLO
3538
3539                "}
3540            );
3541        });
3542
3543        // Simulate refusing the second message. The message should be truncated
3544        // when a user prompt is refused.
3545        refuse_next.store(true, SeqCst);
3546        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3547            .await
3548            .unwrap();
3549        thread.read_with(cx, |thread, cx| {
3550            assert_eq!(
3551                thread.to_markdown(cx),
3552                indoc! {"
3553                    ## User
3554
3555                    hello
3556
3557                    ## Assistant
3558
3559                    HELLO
3560
3561                "}
3562            );
3563        });
3564    }
3565
3566    async fn run_until_first_tool_call(
3567        thread: &Entity<AcpThread>,
3568        cx: &mut TestAppContext,
3569    ) -> usize {
3570        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3571
3572        let subscription = cx.update(|cx| {
3573            cx.subscribe(thread, move |thread, _, cx| {
3574                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3575                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3576                        return tx.try_send(ix).unwrap();
3577                    }
3578                }
3579            })
3580        });
3581
3582        select! {
3583            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3584                panic!("Timeout waiting for tool call")
3585            }
3586            ix = rx.next().fuse() => {
3587                drop(subscription);
3588                ix.unwrap()
3589            }
3590        }
3591    }
3592
3593    #[derive(Clone, Default)]
3594    struct FakeAgentConnection {
3595        auth_methods: Vec<acp::AuthMethod>,
3596        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3597        on_user_message: Option<
3598            Rc<
3599                dyn Fn(
3600                        acp::PromptRequest,
3601                        WeakEntity<AcpThread>,
3602                        AsyncApp,
3603                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3604                    + 'static,
3605            >,
3606        >,
3607    }
3608
3609    impl FakeAgentConnection {
3610        fn new() -> Self {
3611            Self {
3612                auth_methods: Vec::new(),
3613                on_user_message: None,
3614                sessions: Arc::default(),
3615            }
3616        }
3617
3618        #[expect(unused)]
3619        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3620            self.auth_methods = auth_methods;
3621            self
3622        }
3623
3624        fn on_user_message(
3625            mut self,
3626            handler: impl Fn(
3627                acp::PromptRequest,
3628                WeakEntity<AcpThread>,
3629                AsyncApp,
3630            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3631            + 'static,
3632        ) -> Self {
3633            self.on_user_message.replace(Rc::new(handler));
3634            self
3635        }
3636    }
3637
3638    impl AgentConnection for FakeAgentConnection {
3639        fn telemetry_id(&self) -> &'static str {
3640            "fake"
3641        }
3642
3643        fn auth_methods(&self) -> &[acp::AuthMethod] {
3644            &self.auth_methods
3645        }
3646
3647        fn new_thread(
3648            self: Rc<Self>,
3649            project: Entity<Project>,
3650            _cwd: &Path,
3651            cx: &mut App,
3652        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3653            let session_id = acp::SessionId(
3654                rand::rng()
3655                    .sample_iter(&distr::Alphanumeric)
3656                    .take(7)
3657                    .map(char::from)
3658                    .collect::<String>()
3659                    .into(),
3660            );
3661            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3662            let thread = cx.new(|cx| {
3663                AcpThread::new(
3664                    "Test",
3665                    self.clone(),
3666                    project,
3667                    action_log,
3668                    session_id.clone(),
3669                    watch::Receiver::constant(acp::PromptCapabilities {
3670                        image: true,
3671                        audio: true,
3672                        embedded_context: true,
3673                        meta: None,
3674                    }),
3675                    cx,
3676                )
3677            });
3678            self.sessions.lock().insert(session_id, thread.downgrade());
3679            Task::ready(Ok(thread))
3680        }
3681
3682        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3683            if self.auth_methods().iter().any(|m| m.id == method) {
3684                Task::ready(Ok(()))
3685            } else {
3686                Task::ready(Err(anyhow!("Invalid Auth Method")))
3687            }
3688        }
3689
3690        fn prompt(
3691            &self,
3692            _id: Option<UserMessageId>,
3693            params: acp::PromptRequest,
3694            cx: &mut App,
3695        ) -> Task<gpui::Result<acp::PromptResponse>> {
3696            let sessions = self.sessions.lock();
3697            let thread = sessions.get(&params.session_id).unwrap();
3698            if let Some(handler) = &self.on_user_message {
3699                let handler = handler.clone();
3700                let thread = thread.clone();
3701                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3702            } else {
3703                Task::ready(Ok(acp::PromptResponse {
3704                    stop_reason: acp::StopReason::EndTurn,
3705                    meta: None,
3706                }))
3707            }
3708        }
3709
3710        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3711            let sessions = self.sessions.lock();
3712            let thread = sessions.get(session_id).unwrap().clone();
3713
3714            cx.spawn(async move |cx| {
3715                thread
3716                    .update(cx, |thread, cx| thread.cancel(cx))
3717                    .unwrap()
3718                    .await
3719            })
3720            .detach();
3721        }
3722
3723        fn truncate(
3724            &self,
3725            session_id: &acp::SessionId,
3726            _cx: &App,
3727        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3728            Some(Rc::new(FakeAgentSessionEditor {
3729                _session_id: session_id.clone(),
3730            }))
3731        }
3732
3733        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3734            self
3735        }
3736    }
3737
3738    struct FakeAgentSessionEditor {
3739        _session_id: acp::SessionId,
3740    }
3741
3742    impl AgentSessionTruncate for FakeAgentSessionEditor {
3743        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3744            Task::ready(Ok(()))
3745        }
3746    }
3747
3748    #[gpui::test]
3749    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3750        init_test(cx);
3751
3752        let fs = FakeFs::new(cx.executor());
3753        let project = Project::test(fs, [], cx).await;
3754        let connection = Rc::new(FakeAgentConnection::new());
3755        let thread = cx
3756            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3757            .await
3758            .unwrap();
3759
3760        // Try to update a tool call that doesn't exist
3761        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3762        thread.update(cx, |thread, cx| {
3763            let result = thread.handle_session_update(
3764                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3765                    id: nonexistent_id.clone(),
3766                    fields: acp::ToolCallUpdateFields {
3767                        status: Some(acp::ToolCallStatus::Completed),
3768                        ..Default::default()
3769                    },
3770                    meta: None,
3771                }),
3772                cx,
3773            );
3774
3775            // The update should succeed (not return an error)
3776            assert!(result.is_ok());
3777
3778            // There should now be exactly one entry in the thread
3779            assert_eq!(thread.entries.len(), 1);
3780
3781            // The entry should be a failed tool call
3782            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3783                assert_eq!(tool_call.id, nonexistent_id);
3784                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3785                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3786
3787                // Check that the content contains the error message
3788                assert_eq!(tool_call.content.len(), 1);
3789                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3790                    match content_block {
3791                        ContentBlock::Markdown { markdown } => {
3792                            let markdown_text = markdown.read(cx).source();
3793                            assert!(markdown_text.contains("Tool call not found"));
3794                        }
3795                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3796                        ContentBlock::ResourceLink { .. } => {
3797                            panic!("Expected markdown content, got resource link")
3798                        }
3799                    }
3800                } else {
3801                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3802                }
3803            } else {
3804                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3805            }
3806        });
3807    }
3808}