1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use ::terminal::terminal_settings::TerminalSettings;
   7use agent_settings::AgentSettings;
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use language::language_settings::FormatOnSave;
  12pub use mention::*;
  13use project::lsp_store::{FormatTrigger, LspFormatTarget};
  14use serde::{Deserialize, Serialize};
  15use settings::{Settings as _, SettingsLocation};
  16use task::{Shell, ShellBuilder};
  17pub use terminal::*;
  18
  19use action_log::ActionLog;
  20use agent_client_protocol::{self as acp};
  21use anyhow::{Context as _, Result, anyhow};
  22use editor::Bias;
  23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  25use itertools::Itertools;
  26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  27use markdown::Markdown;
  28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  29use std::collections::HashMap;
  30use std::error::Error;
  31use std::fmt::{Formatter, Write};
  32use std::ops::Range;
  33use std::process::ExitStatus;
  34use std::rc::Rc;
  35use std::time::{Duration, Instant};
  36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  37use ui::App;
  38use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  39use uuid::Uuid;
  40
  41#[derive(Debug)]
  42pub struct UserMessage {
  43    pub id: Option<UserMessageId>,
  44    pub content: ContentBlock,
  45    pub chunks: Vec<acp::ContentBlock>,
  46    pub checkpoint: Option<Checkpoint>,
  47}
  48
  49#[derive(Debug)]
  50pub struct Checkpoint {
  51    git_checkpoint: GitStoreCheckpoint,
  52    pub show: bool,
  53}
  54
  55impl UserMessage {
  56    fn to_markdown(&self, cx: &App) -> String {
  57        let mut markdown = String::new();
  58        if self
  59            .checkpoint
  60            .as_ref()
  61            .is_some_and(|checkpoint| checkpoint.show)
  62        {
  63            writeln!(markdown, "## User (checkpoint)").unwrap();
  64        } else {
  65            writeln!(markdown, "## User").unwrap();
  66        }
  67        writeln!(markdown).unwrap();
  68        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  69        writeln!(markdown).unwrap();
  70        markdown
  71    }
  72}
  73
  74#[derive(Debug, PartialEq)]
  75pub struct AssistantMessage {
  76    pub chunks: Vec<AssistantMessageChunk>,
  77}
  78
  79impl AssistantMessage {
  80    pub fn to_markdown(&self, cx: &App) -> String {
  81        format!(
  82            "## Assistant\n\n{}\n\n",
  83            self.chunks
  84                .iter()
  85                .map(|chunk| chunk.to_markdown(cx))
  86                .join("\n\n")
  87        )
  88    }
  89}
  90
  91#[derive(Debug, PartialEq)]
  92pub enum AssistantMessageChunk {
  93    Message { block: ContentBlock },
  94    Thought { block: ContentBlock },
  95}
  96
  97impl AssistantMessageChunk {
  98    pub fn from_str(
  99        chunk: &str,
 100        language_registry: &Arc<LanguageRegistry>,
 101        path_style: PathStyle,
 102        cx: &mut App,
 103    ) -> Self {
 104        Self::Message {
 105            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 106        }
 107    }
 108
 109    fn to_markdown(&self, cx: &App) -> String {
 110        match self {
 111            Self::Message { block } => block.to_markdown(cx).to_string(),
 112            Self::Thought { block } => {
 113                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 114            }
 115        }
 116    }
 117}
 118
 119#[derive(Debug)]
 120pub enum AgentThreadEntry {
 121    UserMessage(UserMessage),
 122    AssistantMessage(AssistantMessage),
 123    ToolCall(ToolCall),
 124}
 125
 126impl AgentThreadEntry {
 127    pub fn to_markdown(&self, cx: &App) -> String {
 128        match self {
 129            Self::UserMessage(message) => message.to_markdown(cx),
 130            Self::AssistantMessage(message) => message.to_markdown(cx),
 131            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 132        }
 133    }
 134
 135    pub fn user_message(&self) -> Option<&UserMessage> {
 136        if let AgentThreadEntry::UserMessage(message) = self {
 137            Some(message)
 138        } else {
 139            None
 140        }
 141    }
 142
 143    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 144        if let AgentThreadEntry::ToolCall(call) = self {
 145            itertools::Either::Left(call.diffs())
 146        } else {
 147            itertools::Either::Right(std::iter::empty())
 148        }
 149    }
 150
 151    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 152        if let AgentThreadEntry::ToolCall(call) = self {
 153            itertools::Either::Left(call.terminals())
 154        } else {
 155            itertools::Either::Right(std::iter::empty())
 156        }
 157    }
 158
 159    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 160        if let AgentThreadEntry::ToolCall(ToolCall {
 161            locations,
 162            resolved_locations,
 163            ..
 164        }) = self
 165        {
 166            Some((
 167                locations.get(ix)?.clone(),
 168                resolved_locations.get(ix)?.clone()?,
 169            ))
 170        } else {
 171            None
 172        }
 173    }
 174}
 175
 176#[derive(Debug)]
 177pub struct ToolCall {
 178    pub id: acp::ToolCallId,
 179    pub label: Entity<Markdown>,
 180    pub kind: acp::ToolKind,
 181    pub content: Vec<ToolCallContent>,
 182    pub status: ToolCallStatus,
 183    pub locations: Vec<acp::ToolCallLocation>,
 184    pub resolved_locations: Vec<Option<AgentLocation>>,
 185    pub raw_input: Option<serde_json::Value>,
 186    pub raw_output: Option<serde_json::Value>,
 187}
 188
 189impl ToolCall {
 190    fn from_acp(
 191        tool_call: acp::ToolCall,
 192        status: ToolCallStatus,
 193        language_registry: Arc<LanguageRegistry>,
 194        path_style: PathStyle,
 195        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 196        cx: &mut App,
 197    ) -> Result<Self> {
 198        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 199            first_line.to_owned() + "…"
 200        } else {
 201            tool_call.title
 202        };
 203        let mut content = Vec::with_capacity(tool_call.content.len());
 204        for item in tool_call.content {
 205            content.push(ToolCallContent::from_acp(
 206                item,
 207                language_registry.clone(),
 208                path_style,
 209                terminals,
 210                cx,
 211            )?);
 212        }
 213
 214        let result = Self {
 215            id: tool_call.id,
 216            label: cx
 217                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 218            kind: tool_call.kind,
 219            content,
 220            locations: tool_call.locations,
 221            resolved_locations: Vec::default(),
 222            status,
 223            raw_input: tool_call.raw_input,
 224            raw_output: tool_call.raw_output,
 225        };
 226        Ok(result)
 227    }
 228
 229    fn update_fields(
 230        &mut self,
 231        fields: acp::ToolCallUpdateFields,
 232        language_registry: Arc<LanguageRegistry>,
 233        path_style: PathStyle,
 234        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 235        cx: &mut App,
 236    ) -> Result<()> {
 237        let acp::ToolCallUpdateFields {
 238            kind,
 239            status,
 240            title,
 241            content,
 242            locations,
 243            raw_input,
 244            raw_output,
 245        } = fields;
 246
 247        if let Some(kind) = kind {
 248            self.kind = kind;
 249        }
 250
 251        if let Some(status) = status {
 252            self.status = status.into();
 253        }
 254
 255        if let Some(title) = title {
 256            self.label.update(cx, |label, cx| {
 257                if let Some((first_line, _)) = title.split_once("\n") {
 258                    label.replace(first_line.to_owned() + "…", cx)
 259                } else {
 260                    label.replace(title, cx);
 261                }
 262            });
 263        }
 264
 265        if let Some(content) = content {
 266            let new_content_len = content.len();
 267            let mut content = content.into_iter();
 268
 269            // Reuse existing content if we can
 270            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 271                old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 272            }
 273            for new in content {
 274                self.content.push(ToolCallContent::from_acp(
 275                    new,
 276                    language_registry.clone(),
 277                    path_style,
 278                    terminals,
 279                    cx,
 280                )?)
 281            }
 282            self.content.truncate(new_content_len);
 283        }
 284
 285        if let Some(locations) = locations {
 286            self.locations = locations;
 287        }
 288
 289        if let Some(raw_input) = raw_input {
 290            self.raw_input = Some(raw_input);
 291        }
 292
 293        if let Some(raw_output) = raw_output {
 294            if self.content.is_empty()
 295                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 296            {
 297                self.content
 298                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 299                        markdown,
 300                    }));
 301            }
 302            self.raw_output = Some(raw_output);
 303        }
 304        Ok(())
 305    }
 306
 307    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 308        self.content.iter().filter_map(|content| match content {
 309            ToolCallContent::Diff(diff) => Some(diff),
 310            ToolCallContent::ContentBlock(_) => None,
 311            ToolCallContent::Terminal(_) => None,
 312        })
 313    }
 314
 315    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 316        self.content.iter().filter_map(|content| match content {
 317            ToolCallContent::Terminal(terminal) => Some(terminal),
 318            ToolCallContent::ContentBlock(_) => None,
 319            ToolCallContent::Diff(_) => None,
 320        })
 321    }
 322
 323    fn to_markdown(&self, cx: &App) -> String {
 324        let mut markdown = format!(
 325            "**Tool Call: {}**\nStatus: {}\n\n",
 326            self.label.read(cx).source(),
 327            self.status
 328        );
 329        for content in &self.content {
 330            markdown.push_str(content.to_markdown(cx).as_str());
 331            markdown.push_str("\n\n");
 332        }
 333        markdown
 334    }
 335
 336    async fn resolve_location(
 337        location: acp::ToolCallLocation,
 338        project: WeakEntity<Project>,
 339        cx: &mut AsyncApp,
 340    ) -> Option<ResolvedLocation> {
 341        let buffer = project
 342            .update(cx, |project, cx| {
 343                project
 344                    .project_path_for_absolute_path(&location.path, cx)
 345                    .map(|path| project.open_buffer(path, cx))
 346            })
 347            .ok()??;
 348        let buffer = buffer.await.log_err()?;
 349        let position = buffer
 350            .update(cx, |buffer, _| {
 351                if let Some(row) = location.line {
 352                    let snapshot = buffer.snapshot();
 353                    let column = snapshot.indent_size_for_line(row).len;
 354                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 355                    snapshot.anchor_before(point)
 356                } else {
 357                    Anchor::MIN
 358                }
 359            })
 360            .ok()?;
 361
 362        Some(ResolvedLocation { buffer, position })
 363    }
 364
 365    fn resolve_locations(
 366        &self,
 367        project: Entity<Project>,
 368        cx: &mut App,
 369    ) -> Task<Vec<Option<ResolvedLocation>>> {
 370        let locations = self.locations.clone();
 371        project.update(cx, |_, cx| {
 372            cx.spawn(async move |project, cx| {
 373                let mut new_locations = Vec::new();
 374                for location in locations {
 375                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 376                }
 377                new_locations
 378            })
 379        })
 380    }
 381}
 382
 383// Separate so we can hold a strong reference to the buffer
 384// for saving on the thread
 385#[derive(Clone, Debug, PartialEq, Eq)]
 386struct ResolvedLocation {
 387    buffer: Entity<Buffer>,
 388    position: Anchor,
 389}
 390
 391impl From<&ResolvedLocation> for AgentLocation {
 392    fn from(value: &ResolvedLocation) -> Self {
 393        Self {
 394            buffer: value.buffer.downgrade(),
 395            position: value.position,
 396        }
 397    }
 398}
 399
 400#[derive(Debug)]
 401pub enum ToolCallStatus {
 402    /// The tool call hasn't started running yet, but we start showing it to
 403    /// the user.
 404    Pending,
 405    /// The tool call is waiting for confirmation from the user.
 406    WaitingForConfirmation {
 407        options: Vec<acp::PermissionOption>,
 408        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 409    },
 410    /// The tool call is currently running.
 411    InProgress,
 412    /// The tool call completed successfully.
 413    Completed,
 414    /// The tool call failed.
 415    Failed,
 416    /// The user rejected the tool call.
 417    Rejected,
 418    /// The user canceled generation so the tool call was canceled.
 419    Canceled,
 420}
 421
 422impl From<acp::ToolCallStatus> for ToolCallStatus {
 423    fn from(status: acp::ToolCallStatus) -> Self {
 424        match status {
 425            acp::ToolCallStatus::Pending => Self::Pending,
 426            acp::ToolCallStatus::InProgress => Self::InProgress,
 427            acp::ToolCallStatus::Completed => Self::Completed,
 428            acp::ToolCallStatus::Failed => Self::Failed,
 429        }
 430    }
 431}
 432
 433impl Display for ToolCallStatus {
 434    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 435        write!(
 436            f,
 437            "{}",
 438            match self {
 439                ToolCallStatus::Pending => "Pending",
 440                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 441                ToolCallStatus::InProgress => "In Progress",
 442                ToolCallStatus::Completed => "Completed",
 443                ToolCallStatus::Failed => "Failed",
 444                ToolCallStatus::Rejected => "Rejected",
 445                ToolCallStatus::Canceled => "Canceled",
 446            }
 447        )
 448    }
 449}
 450
 451#[derive(Debug, PartialEq, Clone)]
 452pub enum ContentBlock {
 453    Empty,
 454    Markdown { markdown: Entity<Markdown> },
 455    ResourceLink { resource_link: acp::ResourceLink },
 456}
 457
 458impl ContentBlock {
 459    pub fn new(
 460        block: acp::ContentBlock,
 461        language_registry: &Arc<LanguageRegistry>,
 462        path_style: PathStyle,
 463        cx: &mut App,
 464    ) -> Self {
 465        let mut this = Self::Empty;
 466        this.append(block, language_registry, path_style, cx);
 467        this
 468    }
 469
 470    pub fn new_combined(
 471        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 472        language_registry: Arc<LanguageRegistry>,
 473        path_style: PathStyle,
 474        cx: &mut App,
 475    ) -> Self {
 476        let mut this = Self::Empty;
 477        for block in blocks {
 478            this.append(block, &language_registry, path_style, cx);
 479        }
 480        this
 481    }
 482
 483    pub fn append(
 484        &mut self,
 485        block: acp::ContentBlock,
 486        language_registry: &Arc<LanguageRegistry>,
 487        path_style: PathStyle,
 488        cx: &mut App,
 489    ) {
 490        if matches!(self, ContentBlock::Empty)
 491            && let acp::ContentBlock::ResourceLink(resource_link) = block
 492        {
 493            *self = ContentBlock::ResourceLink { resource_link };
 494            return;
 495        }
 496
 497        let new_content = self.block_string_contents(block, path_style);
 498
 499        match self {
 500            ContentBlock::Empty => {
 501                *self = Self::create_markdown_block(new_content, language_registry, cx);
 502            }
 503            ContentBlock::Markdown { markdown } => {
 504                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 505            }
 506            ContentBlock::ResourceLink { resource_link } => {
 507                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 508                let combined = format!("{}\n{}", existing_content, new_content);
 509
 510                *self = Self::create_markdown_block(combined, language_registry, cx);
 511            }
 512        }
 513    }
 514
 515    fn create_markdown_block(
 516        content: String,
 517        language_registry: &Arc<LanguageRegistry>,
 518        cx: &mut App,
 519    ) -> ContentBlock {
 520        ContentBlock::Markdown {
 521            markdown: cx
 522                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 523        }
 524    }
 525
 526    fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
 527        match block {
 528            acp::ContentBlock::Text(text_content) => text_content.text,
 529            acp::ContentBlock::ResourceLink(resource_link) => {
 530                Self::resource_link_md(&resource_link.uri, path_style)
 531            }
 532            acp::ContentBlock::Resource(acp::EmbeddedResource {
 533                resource:
 534                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 535                        uri,
 536                        ..
 537                    }),
 538                ..
 539            }) => Self::resource_link_md(&uri, path_style),
 540            acp::ContentBlock::Image(image) => Self::image_md(&image),
 541            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 542        }
 543    }
 544
 545    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 546        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 547            uri.as_link().to_string()
 548        } else {
 549            uri.to_string()
 550        }
 551    }
 552
 553    fn image_md(_image: &acp::ImageContent) -> String {
 554        "`Image`".into()
 555    }
 556
 557    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 558        match self {
 559            ContentBlock::Empty => "",
 560            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 561            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 562        }
 563    }
 564
 565    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 566        match self {
 567            ContentBlock::Empty => None,
 568            ContentBlock::Markdown { markdown } => Some(markdown),
 569            ContentBlock::ResourceLink { .. } => None,
 570        }
 571    }
 572
 573    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 574        match self {
 575            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 576            _ => None,
 577        }
 578    }
 579}
 580
 581#[derive(Debug)]
 582pub enum ToolCallContent {
 583    ContentBlock(ContentBlock),
 584    Diff(Entity<Diff>),
 585    Terminal(Entity<Terminal>),
 586}
 587
 588impl ToolCallContent {
 589    pub fn from_acp(
 590        content: acp::ToolCallContent,
 591        language_registry: Arc<LanguageRegistry>,
 592        path_style: PathStyle,
 593        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 594        cx: &mut App,
 595    ) -> Result<Self> {
 596        match content {
 597            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 598                content,
 599                &language_registry,
 600                path_style,
 601                cx,
 602            ))),
 603            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 604                Diff::finalized(
 605                    diff.path.to_string_lossy().into_owned(),
 606                    diff.old_text,
 607                    diff.new_text,
 608                    language_registry,
 609                    cx,
 610                )
 611            }))),
 612            acp::ToolCallContent::Terminal { terminal_id } => terminals
 613                .get(&terminal_id)
 614                .cloned()
 615                .map(Self::Terminal)
 616                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 617        }
 618    }
 619
 620    pub fn update_from_acp(
 621        &mut self,
 622        new: acp::ToolCallContent,
 623        language_registry: Arc<LanguageRegistry>,
 624        path_style: PathStyle,
 625        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 626        cx: &mut App,
 627    ) -> Result<()> {
 628        let needs_update = match (&self, &new) {
 629            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 630                old_diff.read(cx).needs_update(
 631                    new_diff.old_text.as_deref().unwrap_or(""),
 632                    &new_diff.new_text,
 633                    cx,
 634                )
 635            }
 636            _ => true,
 637        };
 638
 639        if needs_update {
 640            *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
 641        }
 642        Ok(())
 643    }
 644
 645    pub fn to_markdown(&self, cx: &App) -> String {
 646        match self {
 647            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 648            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 649            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 650        }
 651    }
 652}
 653
 654#[derive(Debug, PartialEq)]
 655pub enum ToolCallUpdate {
 656    UpdateFields(acp::ToolCallUpdate),
 657    UpdateDiff(ToolCallUpdateDiff),
 658    UpdateTerminal(ToolCallUpdateTerminal),
 659}
 660
 661impl ToolCallUpdate {
 662    fn id(&self) -> &acp::ToolCallId {
 663        match self {
 664            Self::UpdateFields(update) => &update.id,
 665            Self::UpdateDiff(diff) => &diff.id,
 666            Self::UpdateTerminal(terminal) => &terminal.id,
 667        }
 668    }
 669}
 670
 671impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 672    fn from(update: acp::ToolCallUpdate) -> Self {
 673        Self::UpdateFields(update)
 674    }
 675}
 676
 677impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 678    fn from(diff: ToolCallUpdateDiff) -> Self {
 679        Self::UpdateDiff(diff)
 680    }
 681}
 682
 683#[derive(Debug, PartialEq)]
 684pub struct ToolCallUpdateDiff {
 685    pub id: acp::ToolCallId,
 686    pub diff: Entity<Diff>,
 687}
 688
 689impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 690    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 691        Self::UpdateTerminal(terminal)
 692    }
 693}
 694
 695#[derive(Debug, PartialEq)]
 696pub struct ToolCallUpdateTerminal {
 697    pub id: acp::ToolCallId,
 698    pub terminal: Entity<Terminal>,
 699}
 700
 701#[derive(Debug, Default)]
 702pub struct Plan {
 703    pub entries: Vec<PlanEntry>,
 704}
 705
 706#[derive(Debug)]
 707pub struct PlanStats<'a> {
 708    pub in_progress_entry: Option<&'a PlanEntry>,
 709    pub pending: u32,
 710    pub completed: u32,
 711}
 712
 713impl Plan {
 714    pub fn is_empty(&self) -> bool {
 715        self.entries.is_empty()
 716    }
 717
 718    pub fn stats(&self) -> PlanStats<'_> {
 719        let mut stats = PlanStats {
 720            in_progress_entry: None,
 721            pending: 0,
 722            completed: 0,
 723        };
 724
 725        for entry in &self.entries {
 726            match &entry.status {
 727                acp::PlanEntryStatus::Pending => {
 728                    stats.pending += 1;
 729                }
 730                acp::PlanEntryStatus::InProgress => {
 731                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 732                }
 733                acp::PlanEntryStatus::Completed => {
 734                    stats.completed += 1;
 735                }
 736            }
 737        }
 738
 739        stats
 740    }
 741}
 742
 743#[derive(Debug)]
 744pub struct PlanEntry {
 745    pub content: Entity<Markdown>,
 746    pub priority: acp::PlanEntryPriority,
 747    pub status: acp::PlanEntryStatus,
 748}
 749
 750impl PlanEntry {
 751    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 752        Self {
 753            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 754            priority: entry.priority,
 755            status: entry.status,
 756        }
 757    }
 758}
 759
 760#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 761pub struct TokenUsage {
 762    pub max_tokens: u64,
 763    pub used_tokens: u64,
 764}
 765
 766impl TokenUsage {
 767    pub fn ratio(&self) -> TokenUsageRatio {
 768        #[cfg(debug_assertions)]
 769        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 770            .unwrap_or("0.8".to_string())
 771            .parse()
 772            .unwrap();
 773        #[cfg(not(debug_assertions))]
 774        let warning_threshold: f32 = 0.8;
 775
 776        // When the maximum is unknown because there is no selected model,
 777        // avoid showing the token limit warning.
 778        if self.max_tokens == 0 {
 779            TokenUsageRatio::Normal
 780        } else if self.used_tokens >= self.max_tokens {
 781            TokenUsageRatio::Exceeded
 782        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 783            TokenUsageRatio::Warning
 784        } else {
 785            TokenUsageRatio::Normal
 786        }
 787    }
 788}
 789
 790#[derive(Debug, Clone, PartialEq, Eq)]
 791pub enum TokenUsageRatio {
 792    Normal,
 793    Warning,
 794    Exceeded,
 795}
 796
 797#[derive(Debug, Clone)]
 798pub struct RetryStatus {
 799    pub last_error: SharedString,
 800    pub attempt: usize,
 801    pub max_attempts: usize,
 802    pub started_at: Instant,
 803    pub duration: Duration,
 804}
 805
 806pub struct AcpThread {
 807    title: SharedString,
 808    entries: Vec<AgentThreadEntry>,
 809    plan: Plan,
 810    project: Entity<Project>,
 811    action_log: Entity<ActionLog>,
 812    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 813    send_task: Option<Task<()>>,
 814    connection: Rc<dyn AgentConnection>,
 815    session_id: acp::SessionId,
 816    token_usage: Option<TokenUsage>,
 817    prompt_capabilities: acp::PromptCapabilities,
 818    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 819    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 820    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 821    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 822}
 823
 824#[derive(Debug)]
 825pub enum AcpThreadEvent {
 826    NewEntry,
 827    TitleUpdated,
 828    TokenUsageUpdated,
 829    EntryUpdated(usize),
 830    EntriesRemoved(Range<usize>),
 831    ToolAuthorizationRequired,
 832    Retry(RetryStatus),
 833    Stopped,
 834    Error,
 835    LoadError(LoadError),
 836    PromptCapabilitiesUpdated,
 837    Refusal,
 838    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 839    ModeUpdated(acp::SessionModeId),
 840}
 841
 842impl EventEmitter<AcpThreadEvent> for AcpThread {}
 843
 844#[derive(Debug, Clone)]
 845pub enum TerminalProviderEvent {
 846    Created {
 847        terminal_id: acp::TerminalId,
 848        label: String,
 849        cwd: Option<PathBuf>,
 850        output_byte_limit: Option<u64>,
 851        terminal: Entity<::terminal::Terminal>,
 852    },
 853    Output {
 854        terminal_id: acp::TerminalId,
 855        data: Vec<u8>,
 856    },
 857    TitleChanged {
 858        terminal_id: acp::TerminalId,
 859        title: String,
 860    },
 861    Exit {
 862        terminal_id: acp::TerminalId,
 863        status: acp::TerminalExitStatus,
 864    },
 865}
 866
 867#[derive(Debug, Clone)]
 868pub enum TerminalProviderCommand {
 869    WriteInput {
 870        terminal_id: acp::TerminalId,
 871        bytes: Vec<u8>,
 872    },
 873    Resize {
 874        terminal_id: acp::TerminalId,
 875        cols: u16,
 876        rows: u16,
 877    },
 878    Close {
 879        terminal_id: acp::TerminalId,
 880    },
 881}
 882
 883impl AcpThread {
 884    pub fn on_terminal_provider_event(
 885        &mut self,
 886        event: TerminalProviderEvent,
 887        cx: &mut Context<Self>,
 888    ) {
 889        match event {
 890            TerminalProviderEvent::Created {
 891                terminal_id,
 892                label,
 893                cwd,
 894                output_byte_limit,
 895                terminal,
 896            } => {
 897                let entity = self.register_terminal_created(
 898                    terminal_id.clone(),
 899                    label,
 900                    cwd,
 901                    output_byte_limit,
 902                    terminal,
 903                    cx,
 904                );
 905
 906                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 907                    for data in chunks.drain(..) {
 908                        entity.update(cx, |term, cx| {
 909                            term.inner().update(cx, |inner, cx| {
 910                                inner.write_output(&data, cx);
 911                            })
 912                        });
 913                    }
 914                }
 915
 916                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 917                    entity.update(cx, |_term, cx| {
 918                        cx.notify();
 919                    });
 920                }
 921
 922                cx.notify();
 923            }
 924            TerminalProviderEvent::Output { terminal_id, data } => {
 925                if let Some(entity) = self.terminals.get(&terminal_id) {
 926                    entity.update(cx, |term, cx| {
 927                        term.inner().update(cx, |inner, cx| {
 928                            inner.write_output(&data, cx);
 929                        })
 930                    });
 931                } else {
 932                    self.pending_terminal_output
 933                        .entry(terminal_id)
 934                        .or_default()
 935                        .push(data);
 936                }
 937            }
 938            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 939                if let Some(entity) = self.terminals.get(&terminal_id) {
 940                    entity.update(cx, |term, cx| {
 941                        term.inner().update(cx, |inner, cx| {
 942                            inner.breadcrumb_text = title;
 943                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 944                        })
 945                    });
 946                }
 947            }
 948            TerminalProviderEvent::Exit {
 949                terminal_id,
 950                status,
 951            } => {
 952                if let Some(entity) = self.terminals.get(&terminal_id) {
 953                    entity.update(cx, |_term, cx| {
 954                        cx.notify();
 955                    });
 956                } else {
 957                    self.pending_terminal_exit.insert(terminal_id, status);
 958                }
 959            }
 960        }
 961    }
 962}
 963
 964#[derive(PartialEq, Eq, Debug)]
 965pub enum ThreadStatus {
 966    Idle,
 967    Generating,
 968}
 969
 970#[derive(Debug, Clone)]
 971pub enum LoadError {
 972    Unsupported {
 973        command: SharedString,
 974        current_version: SharedString,
 975        minimum_version: SharedString,
 976    },
 977    FailedToInstall(SharedString),
 978    Exited {
 979        status: ExitStatus,
 980    },
 981    Other(SharedString),
 982}
 983
 984impl Display for LoadError {
 985    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 986        match self {
 987            LoadError::Unsupported {
 988                command: path,
 989                current_version,
 990                minimum_version,
 991            } => {
 992                write!(
 993                    f,
 994                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
 995                )
 996            }
 997            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
 998            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
 999            LoadError::Other(msg) => write!(f, "{msg}"),
1000        }
1001    }
1002}
1003
1004impl Error for LoadError {}
1005
1006impl AcpThread {
1007    pub fn new(
1008        title: impl Into<SharedString>,
1009        connection: Rc<dyn AgentConnection>,
1010        project: Entity<Project>,
1011        action_log: Entity<ActionLog>,
1012        session_id: acp::SessionId,
1013        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1014        cx: &mut Context<Self>,
1015    ) -> Self {
1016        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1017        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1018            loop {
1019                let caps = prompt_capabilities_rx.recv().await?;
1020                this.update(cx, |this, cx| {
1021                    this.prompt_capabilities = caps;
1022                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1023                })?;
1024            }
1025        });
1026
1027        Self {
1028            action_log,
1029            shared_buffers: Default::default(),
1030            entries: Default::default(),
1031            plan: Default::default(),
1032            title: title.into(),
1033            project,
1034            send_task: None,
1035            connection,
1036            session_id,
1037            token_usage: None,
1038            prompt_capabilities,
1039            _observe_prompt_capabilities: task,
1040            terminals: HashMap::default(),
1041            pending_terminal_output: HashMap::default(),
1042            pending_terminal_exit: HashMap::default(),
1043        }
1044    }
1045
1046    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1047        self.prompt_capabilities.clone()
1048    }
1049
1050    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1051        &self.connection
1052    }
1053
1054    pub fn action_log(&self) -> &Entity<ActionLog> {
1055        &self.action_log
1056    }
1057
1058    pub fn project(&self) -> &Entity<Project> {
1059        &self.project
1060    }
1061
1062    pub fn title(&self) -> SharedString {
1063        self.title.clone()
1064    }
1065
1066    pub fn entries(&self) -> &[AgentThreadEntry] {
1067        &self.entries
1068    }
1069
1070    pub fn session_id(&self) -> &acp::SessionId {
1071        &self.session_id
1072    }
1073
1074    pub fn status(&self) -> ThreadStatus {
1075        if self.send_task.is_some() {
1076            ThreadStatus::Generating
1077        } else {
1078            ThreadStatus::Idle
1079        }
1080    }
1081
1082    pub fn token_usage(&self) -> Option<&TokenUsage> {
1083        self.token_usage.as_ref()
1084    }
1085
1086    pub fn has_pending_edit_tool_calls(&self) -> bool {
1087        for entry in self.entries.iter().rev() {
1088            match entry {
1089                AgentThreadEntry::UserMessage(_) => return false,
1090                AgentThreadEntry::ToolCall(
1091                    call @ ToolCall {
1092                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1093                        ..
1094                    },
1095                ) if call.diffs().next().is_some() => {
1096                    return true;
1097                }
1098                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1099            }
1100        }
1101
1102        false
1103    }
1104
1105    pub fn used_tools_since_last_user_message(&self) -> bool {
1106        for entry in self.entries.iter().rev() {
1107            match entry {
1108                AgentThreadEntry::UserMessage(..) => return false,
1109                AgentThreadEntry::AssistantMessage(..) => continue,
1110                AgentThreadEntry::ToolCall(..) => return true,
1111            }
1112        }
1113
1114        false
1115    }
1116
1117    pub fn handle_session_update(
1118        &mut self,
1119        update: acp::SessionUpdate,
1120        cx: &mut Context<Self>,
1121    ) -> Result<(), acp::Error> {
1122        match update {
1123            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1124                self.push_user_content_block(None, content, cx);
1125            }
1126            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1127                self.push_assistant_content_block(content, false, cx);
1128            }
1129            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1130                self.push_assistant_content_block(content, true, cx);
1131            }
1132            acp::SessionUpdate::ToolCall(tool_call) => {
1133                self.upsert_tool_call(tool_call, cx)?;
1134            }
1135            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1136                self.update_tool_call(tool_call_update, cx)?;
1137            }
1138            acp::SessionUpdate::Plan(plan) => {
1139                self.update_plan(plan, cx);
1140            }
1141            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1142                available_commands,
1143                ..
1144            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1145            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1146                current_mode_id,
1147                ..
1148            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1149        }
1150        Ok(())
1151    }
1152
1153    pub fn push_user_content_block(
1154        &mut self,
1155        message_id: Option<UserMessageId>,
1156        chunk: acp::ContentBlock,
1157        cx: &mut Context<Self>,
1158    ) {
1159        let language_registry = self.project.read(cx).languages().clone();
1160        let path_style = self.project.read(cx).path_style(cx);
1161        let entries_len = self.entries.len();
1162
1163        if let Some(last_entry) = self.entries.last_mut()
1164            && let AgentThreadEntry::UserMessage(UserMessage {
1165                id,
1166                content,
1167                chunks,
1168                ..
1169            }) = last_entry
1170        {
1171            *id = message_id.or(id.take());
1172            content.append(chunk.clone(), &language_registry, path_style, cx);
1173            chunks.push(chunk);
1174            let idx = entries_len - 1;
1175            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1176        } else {
1177            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1178            self.push_entry(
1179                AgentThreadEntry::UserMessage(UserMessage {
1180                    id: message_id,
1181                    content,
1182                    chunks: vec![chunk],
1183                    checkpoint: None,
1184                }),
1185                cx,
1186            );
1187        }
1188    }
1189
1190    pub fn push_assistant_content_block(
1191        &mut self,
1192        chunk: acp::ContentBlock,
1193        is_thought: bool,
1194        cx: &mut Context<Self>,
1195    ) {
1196        let language_registry = self.project.read(cx).languages().clone();
1197        let path_style = self.project.read(cx).path_style(cx);
1198        let entries_len = self.entries.len();
1199        if let Some(last_entry) = self.entries.last_mut()
1200            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1201        {
1202            let idx = entries_len - 1;
1203            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1204            match (chunks.last_mut(), is_thought) {
1205                (Some(AssistantMessageChunk::Message { block }), false)
1206                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1207                    block.append(chunk, &language_registry, path_style, cx)
1208                }
1209                _ => {
1210                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1211                    if is_thought {
1212                        chunks.push(AssistantMessageChunk::Thought { block })
1213                    } else {
1214                        chunks.push(AssistantMessageChunk::Message { block })
1215                    }
1216                }
1217            }
1218        } else {
1219            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1220            let chunk = if is_thought {
1221                AssistantMessageChunk::Thought { block }
1222            } else {
1223                AssistantMessageChunk::Message { block }
1224            };
1225
1226            self.push_entry(
1227                AgentThreadEntry::AssistantMessage(AssistantMessage {
1228                    chunks: vec![chunk],
1229                }),
1230                cx,
1231            );
1232        }
1233    }
1234
1235    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1236        self.entries.push(entry);
1237        cx.emit(AcpThreadEvent::NewEntry);
1238    }
1239
1240    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1241        self.connection.set_title(&self.session_id, cx).is_some()
1242    }
1243
1244    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1245        if title != self.title {
1246            self.title = title.clone();
1247            cx.emit(AcpThreadEvent::TitleUpdated);
1248            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1249                return set_title.run(title, cx);
1250            }
1251        }
1252        Task::ready(Ok(()))
1253    }
1254
1255    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1256        self.token_usage = usage;
1257        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1258    }
1259
1260    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1261        cx.emit(AcpThreadEvent::Retry(status));
1262    }
1263
1264    pub fn update_tool_call(
1265        &mut self,
1266        update: impl Into<ToolCallUpdate>,
1267        cx: &mut Context<Self>,
1268    ) -> Result<()> {
1269        let update = update.into();
1270        let languages = self.project.read(cx).languages().clone();
1271        let path_style = self.project.read(cx).path_style(cx);
1272
1273        let ix = match self.index_for_tool_call(update.id()) {
1274            Some(ix) => ix,
1275            None => {
1276                // Tool call not found - create a failed tool call entry
1277                let failed_tool_call = ToolCall {
1278                    id: update.id().clone(),
1279                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1280                    kind: acp::ToolKind::Fetch,
1281                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1282                        acp::ContentBlock::Text(acp::TextContent {
1283                            text: "Tool call not found".to_string(),
1284                            annotations: None,
1285                            meta: None,
1286                        }),
1287                        &languages,
1288                        path_style,
1289                        cx,
1290                    ))],
1291                    status: ToolCallStatus::Failed,
1292                    locations: Vec::new(),
1293                    resolved_locations: Vec::new(),
1294                    raw_input: None,
1295                    raw_output: None,
1296                };
1297                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1298                return Ok(());
1299            }
1300        };
1301        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1302            unreachable!()
1303        };
1304
1305        match update {
1306            ToolCallUpdate::UpdateFields(update) => {
1307                let location_updated = update.fields.locations.is_some();
1308                call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1309                if location_updated {
1310                    self.resolve_locations(update.id, cx);
1311                }
1312            }
1313            ToolCallUpdate::UpdateDiff(update) => {
1314                call.content.clear();
1315                call.content.push(ToolCallContent::Diff(update.diff));
1316            }
1317            ToolCallUpdate::UpdateTerminal(update) => {
1318                call.content.clear();
1319                call.content
1320                    .push(ToolCallContent::Terminal(update.terminal));
1321            }
1322        }
1323
1324        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1325
1326        Ok(())
1327    }
1328
1329    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1330    pub fn upsert_tool_call(
1331        &mut self,
1332        tool_call: acp::ToolCall,
1333        cx: &mut Context<Self>,
1334    ) -> Result<(), acp::Error> {
1335        let status = tool_call.status.into();
1336        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1337    }
1338
1339    /// Fails if id does not match an existing entry.
1340    pub fn upsert_tool_call_inner(
1341        &mut self,
1342        update: acp::ToolCallUpdate,
1343        status: ToolCallStatus,
1344        cx: &mut Context<Self>,
1345    ) -> Result<(), acp::Error> {
1346        let language_registry = self.project.read(cx).languages().clone();
1347        let path_style = self.project.read(cx).path_style(cx);
1348        let id = update.id.clone();
1349
1350        if let Some(ix) = self.index_for_tool_call(&id) {
1351            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1352                unreachable!()
1353            };
1354
1355            call.update_fields(
1356                update.fields,
1357                language_registry,
1358                path_style,
1359                &self.terminals,
1360                cx,
1361            )?;
1362            call.status = status;
1363
1364            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1365        } else {
1366            let call = ToolCall::from_acp(
1367                update.try_into()?,
1368                status,
1369                language_registry,
1370                self.project.read(cx).path_style(cx),
1371                &self.terminals,
1372                cx,
1373            )?;
1374            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1375        };
1376
1377        self.resolve_locations(id, cx);
1378        Ok(())
1379    }
1380
1381    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1382        self.entries
1383            .iter()
1384            .enumerate()
1385            .rev()
1386            .find_map(|(index, entry)| {
1387                if let AgentThreadEntry::ToolCall(tool_call) = entry
1388                    && &tool_call.id == id
1389                {
1390                    Some(index)
1391                } else {
1392                    None
1393                }
1394            })
1395    }
1396
1397    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1398        // The tool call we are looking for is typically the last one, or very close to the end.
1399        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1400        self.entries
1401            .iter_mut()
1402            .enumerate()
1403            .rev()
1404            .find_map(|(index, tool_call)| {
1405                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1406                    && &tool_call.id == id
1407                {
1408                    Some((index, tool_call))
1409                } else {
1410                    None
1411                }
1412            })
1413    }
1414
1415    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1416        self.entries
1417            .iter()
1418            .enumerate()
1419            .rev()
1420            .find_map(|(index, tool_call)| {
1421                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1422                    && &tool_call.id == id
1423                {
1424                    Some((index, tool_call))
1425                } else {
1426                    None
1427                }
1428            })
1429    }
1430
1431    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1432        let project = self.project.clone();
1433        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1434            return;
1435        };
1436        let task = tool_call.resolve_locations(project, cx);
1437        cx.spawn(async move |this, cx| {
1438            let resolved_locations = task.await;
1439
1440            this.update(cx, |this, cx| {
1441                let project = this.project.clone();
1442
1443                for location in resolved_locations.iter().flatten() {
1444                    this.shared_buffers
1445                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1446                }
1447                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1448                    return;
1449                };
1450
1451                if let Some(Some(location)) = resolved_locations.last() {
1452                    project.update(cx, |project, cx| {
1453                        let should_ignore = if let Some(agent_location) = project
1454                            .agent_location()
1455                            .filter(|agent_location| agent_location.buffer == location.buffer)
1456                        {
1457                            let snapshot = location.buffer.read(cx).snapshot();
1458                            let old_position = agent_location.position.to_point(&snapshot);
1459                            let new_position = location.position.to_point(&snapshot);
1460
1461                            // ignore this so that when we get updates from the edit tool
1462                            // the position doesn't reset to the startof line
1463                            old_position.row == new_position.row
1464                                && old_position.column > new_position.column
1465                        } else {
1466                            false
1467                        };
1468                        if !should_ignore {
1469                            project.set_agent_location(Some(location.into()), cx);
1470                        }
1471                    });
1472                }
1473
1474                let resolved_locations = resolved_locations
1475                    .iter()
1476                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1477                    .collect::<Vec<_>>();
1478
1479                if tool_call.resolved_locations != resolved_locations {
1480                    tool_call.resolved_locations = resolved_locations;
1481                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1482                }
1483            })
1484        })
1485        .detach();
1486    }
1487
1488    pub fn request_tool_call_authorization(
1489        &mut self,
1490        tool_call: acp::ToolCallUpdate,
1491        options: Vec<acp::PermissionOption>,
1492        respect_always_allow_setting: bool,
1493        cx: &mut Context<Self>,
1494    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1495        let (tx, rx) = oneshot::channel();
1496
1497        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1498            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1499            // some tools would (incorrectly) continue to auto-accept.
1500            if let Some(allow_once_option) = options.iter().find_map(|option| {
1501                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1502                    Some(option.id.clone())
1503                } else {
1504                    None
1505                }
1506            }) {
1507                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1508                return Ok(async {
1509                    acp::RequestPermissionOutcome::Selected {
1510                        option_id: allow_once_option,
1511                    }
1512                }
1513                .boxed());
1514            }
1515        }
1516
1517        let status = ToolCallStatus::WaitingForConfirmation {
1518            options,
1519            respond_tx: tx,
1520        };
1521
1522        self.upsert_tool_call_inner(tool_call, status, cx)?;
1523        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1524
1525        let fut = async {
1526            match rx.await {
1527                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1528                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1529            }
1530        }
1531        .boxed();
1532
1533        Ok(fut)
1534    }
1535
1536    pub fn authorize_tool_call(
1537        &mut self,
1538        id: acp::ToolCallId,
1539        option_id: acp::PermissionOptionId,
1540        option_kind: acp::PermissionOptionKind,
1541        cx: &mut Context<Self>,
1542    ) {
1543        let Some((ix, call)) = self.tool_call_mut(&id) else {
1544            return;
1545        };
1546
1547        let new_status = match option_kind {
1548            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1549                ToolCallStatus::Rejected
1550            }
1551            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1552                ToolCallStatus::InProgress
1553            }
1554        };
1555
1556        let curr_status = mem::replace(&mut call.status, new_status);
1557
1558        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1559            respond_tx.send(option_id).log_err();
1560        } else if cfg!(debug_assertions) {
1561            panic!("tried to authorize an already authorized tool call");
1562        }
1563
1564        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1565    }
1566
1567    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1568        let mut first_tool_call = None;
1569
1570        for entry in self.entries.iter().rev() {
1571            match &entry {
1572                AgentThreadEntry::ToolCall(call) => {
1573                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1574                        first_tool_call = Some(call);
1575                    } else {
1576                        continue;
1577                    }
1578                }
1579                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1580                    // Reached the beginning of the turn.
1581                    // If we had pending permission requests in the previous turn, they have been cancelled.
1582                    break;
1583                }
1584            }
1585        }
1586
1587        first_tool_call
1588    }
1589
1590    pub fn plan(&self) -> &Plan {
1591        &self.plan
1592    }
1593
1594    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1595        let new_entries_len = request.entries.len();
1596        let mut new_entries = request.entries.into_iter();
1597
1598        // Reuse existing markdown to prevent flickering
1599        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1600            let PlanEntry {
1601                content,
1602                priority,
1603                status,
1604            } = old;
1605            content.update(cx, |old, cx| {
1606                old.replace(new.content, cx);
1607            });
1608            *priority = new.priority;
1609            *status = new.status;
1610        }
1611        for new in new_entries {
1612            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1613        }
1614        self.plan.entries.truncate(new_entries_len);
1615
1616        cx.notify();
1617    }
1618
1619    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1620        self.plan
1621            .entries
1622            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1623        cx.notify();
1624    }
1625
1626    #[cfg(any(test, feature = "test-support"))]
1627    pub fn send_raw(
1628        &mut self,
1629        message: &str,
1630        cx: &mut Context<Self>,
1631    ) -> BoxFuture<'static, Result<()>> {
1632        self.send(
1633            vec![acp::ContentBlock::Text(acp::TextContent {
1634                text: message.to_string(),
1635                annotations: None,
1636                meta: None,
1637            })],
1638            cx,
1639        )
1640    }
1641
1642    pub fn send(
1643        &mut self,
1644        message: Vec<acp::ContentBlock>,
1645        cx: &mut Context<Self>,
1646    ) -> BoxFuture<'static, Result<()>> {
1647        let block = ContentBlock::new_combined(
1648            message.clone(),
1649            self.project.read(cx).languages().clone(),
1650            self.project.read(cx).path_style(cx),
1651            cx,
1652        );
1653        let request = acp::PromptRequest {
1654            prompt: message.clone(),
1655            session_id: self.session_id.clone(),
1656            meta: None,
1657        };
1658        let git_store = self.project.read(cx).git_store().clone();
1659
1660        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1661            Some(UserMessageId::new())
1662        } else {
1663            None
1664        };
1665
1666        self.run_turn(cx, async move |this, cx| {
1667            this.update(cx, |this, cx| {
1668                this.push_entry(
1669                    AgentThreadEntry::UserMessage(UserMessage {
1670                        id: message_id.clone(),
1671                        content: block,
1672                        chunks: message,
1673                        checkpoint: None,
1674                    }),
1675                    cx,
1676                );
1677            })
1678            .ok();
1679
1680            let old_checkpoint = git_store
1681                .update(cx, |git, cx| git.checkpoint(cx))?
1682                .await
1683                .context("failed to get old checkpoint")
1684                .log_err();
1685            this.update(cx, |this, cx| {
1686                if let Some((_ix, message)) = this.last_user_message() {
1687                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1688                        git_checkpoint,
1689                        show: false,
1690                    });
1691                }
1692                this.connection.prompt(message_id, request, cx)
1693            })?
1694            .await
1695        })
1696    }
1697
1698    pub fn can_resume(&self, cx: &App) -> bool {
1699        self.connection.resume(&self.session_id, cx).is_some()
1700    }
1701
1702    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1703        self.run_turn(cx, async move |this, cx| {
1704            this.update(cx, |this, cx| {
1705                this.connection
1706                    .resume(&this.session_id, cx)
1707                    .map(|resume| resume.run(cx))
1708            })?
1709            .context("resuming a session is not supported")?
1710            .await
1711        })
1712    }
1713
1714    fn run_turn(
1715        &mut self,
1716        cx: &mut Context<Self>,
1717        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1718    ) -> BoxFuture<'static, Result<()>> {
1719        self.clear_completed_plan_entries(cx);
1720
1721        let (tx, rx) = oneshot::channel();
1722        let cancel_task = self.cancel(cx);
1723
1724        self.send_task = Some(cx.spawn(async move |this, cx| {
1725            cancel_task.await;
1726            tx.send(f(this, cx).await).ok();
1727        }));
1728
1729        cx.spawn(async move |this, cx| {
1730            let response = rx.await;
1731
1732            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1733                .await?;
1734
1735            this.update(cx, |this, cx| {
1736                this.project
1737                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1738                match response {
1739                    Ok(Err(e)) => {
1740                        this.send_task.take();
1741                        cx.emit(AcpThreadEvent::Error);
1742                        Err(e)
1743                    }
1744                    result => {
1745                        let canceled = matches!(
1746                            result,
1747                            Ok(Ok(acp::PromptResponse {
1748                                stop_reason: acp::StopReason::Cancelled,
1749                                meta: None,
1750                            }))
1751                        );
1752
1753                        // We only take the task if the current prompt wasn't canceled.
1754                        //
1755                        // This prompt may have been canceled because another one was sent
1756                        // while it was still generating. In these cases, dropping `send_task`
1757                        // would cause the next generation to be canceled.
1758                        if !canceled {
1759                            this.send_task.take();
1760                        }
1761
1762                        // Handle refusal - distinguish between user prompt and tool call refusals
1763                        if let Ok(Ok(acp::PromptResponse {
1764                            stop_reason: acp::StopReason::Refusal,
1765                            meta: _,
1766                        })) = result
1767                        {
1768                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1769                                // Check if there's a completed tool call with results after the last user message
1770                                // This indicates the refusal is in response to tool output, not the user's prompt
1771                                let has_completed_tool_call_after_user_msg =
1772                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1773                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1774                                            // Check if the tool call has completed and has output
1775                                            matches!(tool_call.status, ToolCallStatus::Completed)
1776                                                && tool_call.raw_output.is_some()
1777                                        } else {
1778                                            false
1779                                        }
1780                                    });
1781
1782                                if has_completed_tool_call_after_user_msg {
1783                                    // Refusal is due to tool output - don't truncate, just notify
1784                                    // The model refused based on what the tool returned
1785                                    cx.emit(AcpThreadEvent::Refusal);
1786                                } else {
1787                                    // User prompt was refused - truncate back to before the user message
1788                                    let range = user_msg_ix..this.entries.len();
1789                                    if range.start < range.end {
1790                                        this.entries.truncate(user_msg_ix);
1791                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1792                                    }
1793                                    cx.emit(AcpThreadEvent::Refusal);
1794                                }
1795                            } else {
1796                                // No user message found, treat as general refusal
1797                                cx.emit(AcpThreadEvent::Refusal);
1798                            }
1799                        }
1800
1801                        cx.emit(AcpThreadEvent::Stopped);
1802                        Ok(())
1803                    }
1804                }
1805            })?
1806        })
1807        .boxed()
1808    }
1809
1810    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1811        let Some(send_task) = self.send_task.take() else {
1812            return Task::ready(());
1813        };
1814
1815        for entry in self.entries.iter_mut() {
1816            if let AgentThreadEntry::ToolCall(call) = entry {
1817                let cancel = matches!(
1818                    call.status,
1819                    ToolCallStatus::Pending
1820                        | ToolCallStatus::WaitingForConfirmation { .. }
1821                        | ToolCallStatus::InProgress
1822                );
1823
1824                if cancel {
1825                    call.status = ToolCallStatus::Canceled;
1826                }
1827            }
1828        }
1829
1830        self.connection.cancel(&self.session_id, cx);
1831
1832        // Wait for the send task to complete
1833        cx.foreground_executor().spawn(send_task)
1834    }
1835
1836    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1837    pub fn restore_checkpoint(
1838        &mut self,
1839        id: UserMessageId,
1840        cx: &mut Context<Self>,
1841    ) -> Task<Result<()>> {
1842        let Some((_, message)) = self.user_message_mut(&id) else {
1843            return Task::ready(Err(anyhow!("message not found")));
1844        };
1845
1846        let checkpoint = message
1847            .checkpoint
1848            .as_ref()
1849            .map(|c| c.git_checkpoint.clone());
1850        let rewind = self.rewind(id.clone(), cx);
1851        let git_store = self.project.read(cx).git_store().clone();
1852
1853        cx.spawn(async move |_, cx| {
1854            rewind.await?;
1855            if let Some(checkpoint) = checkpoint {
1856                git_store
1857                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1858                    .await?;
1859            }
1860
1861            Ok(())
1862        })
1863    }
1864
1865    /// Rewinds this thread to before the entry at `index`, removing it and all
1866    /// subsequent entries while rejecting any action_log changes made from that point.
1867    /// Unlike `restore_checkpoint`, this method does not restore from git.
1868    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1869        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1870            return Task::ready(Err(anyhow!("not supported")));
1871        };
1872
1873        cx.spawn(async move |this, cx| {
1874            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1875            this.update(cx, |this, cx| {
1876                if let Some((ix, _)) = this.user_message_mut(&id) {
1877                    let range = ix..this.entries.len();
1878                    this.entries.truncate(ix);
1879                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1880                }
1881                this.action_log()
1882                    .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1883            })?
1884            .await;
1885            Ok(())
1886        })
1887    }
1888
1889    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1890        let git_store = self.project.read(cx).git_store().clone();
1891
1892        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1893            if let Some(checkpoint) = message.checkpoint.as_ref() {
1894                checkpoint.git_checkpoint.clone()
1895            } else {
1896                return Task::ready(Ok(()));
1897            }
1898        } else {
1899            return Task::ready(Ok(()));
1900        };
1901
1902        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1903        cx.spawn(async move |this, cx| {
1904            let new_checkpoint = new_checkpoint
1905                .await
1906                .context("failed to get new checkpoint")
1907                .log_err();
1908            if let Some(new_checkpoint) = new_checkpoint {
1909                let equal = git_store
1910                    .update(cx, |git, cx| {
1911                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1912                    })?
1913                    .await
1914                    .unwrap_or(true);
1915                this.update(cx, |this, cx| {
1916                    let (ix, message) = this.last_user_message().context("no user message")?;
1917                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1918                    checkpoint.show = !equal;
1919                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1920                    anyhow::Ok(())
1921                })??;
1922            }
1923
1924            Ok(())
1925        })
1926    }
1927
1928    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1929        self.entries
1930            .iter_mut()
1931            .enumerate()
1932            .rev()
1933            .find_map(|(ix, entry)| {
1934                if let AgentThreadEntry::UserMessage(message) = entry {
1935                    Some((ix, message))
1936                } else {
1937                    None
1938                }
1939            })
1940    }
1941
1942    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1943        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1944            if let AgentThreadEntry::UserMessage(message) = entry {
1945                if message.id.as_ref() == Some(id) {
1946                    Some((ix, message))
1947                } else {
1948                    None
1949                }
1950            } else {
1951                None
1952            }
1953        })
1954    }
1955
1956    pub fn read_text_file(
1957        &self,
1958        path: PathBuf,
1959        line: Option<u32>,
1960        limit: Option<u32>,
1961        reuse_shared_snapshot: bool,
1962        cx: &mut Context<Self>,
1963    ) -> Task<Result<String, acp::Error>> {
1964        // Args are 1-based, move to 0-based
1965        let line = line.unwrap_or_default().saturating_sub(1);
1966        let limit = limit.unwrap_or(u32::MAX);
1967        let project = self.project.clone();
1968        let action_log = self.action_log.clone();
1969        cx.spawn(async move |this, cx| {
1970            let load = project
1971                .update(cx, |project, cx| {
1972                    let path = project
1973                        .project_path_for_absolute_path(&path, cx)
1974                        .ok_or_else(|| {
1975                            acp::Error::resource_not_found(Some(path.display().to_string()))
1976                        })?;
1977                    Ok(project.open_buffer(path, cx))
1978                })
1979                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1980                .flatten()?;
1981
1982            let buffer = load.await?;
1983
1984            let snapshot = if reuse_shared_snapshot {
1985                this.read_with(cx, |this, _| {
1986                    this.shared_buffers.get(&buffer.clone()).cloned()
1987                })
1988                .log_err()
1989                .flatten()
1990            } else {
1991                None
1992            };
1993
1994            let snapshot = if let Some(snapshot) = snapshot {
1995                snapshot
1996            } else {
1997                action_log.update(cx, |action_log, cx| {
1998                    action_log.buffer_read(buffer.clone(), cx);
1999                })?;
2000
2001                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2002                this.update(cx, |this, _| {
2003                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2004                })?;
2005                snapshot
2006            };
2007
2008            let max_point = snapshot.max_point();
2009            let start_position = Point::new(line, 0);
2010
2011            if start_position > max_point {
2012                return Err(acp::Error::invalid_params().with_data(format!(
2013                    "Attempting to read beyond the end of the file, line {}:{}",
2014                    max_point.row + 1,
2015                    max_point.column
2016                )));
2017            }
2018
2019            let start = snapshot.anchor_before(start_position);
2020            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2021
2022            project.update(cx, |project, cx| {
2023                project.set_agent_location(
2024                    Some(AgentLocation {
2025                        buffer: buffer.downgrade(),
2026                        position: start,
2027                    }),
2028                    cx,
2029                );
2030            })?;
2031
2032            Ok(snapshot.text_for_range(start..end).collect::<String>())
2033        })
2034    }
2035
2036    pub fn write_text_file(
2037        &self,
2038        path: PathBuf,
2039        content: String,
2040        cx: &mut Context<Self>,
2041    ) -> Task<Result<()>> {
2042        let project = self.project.clone();
2043        let action_log = self.action_log.clone();
2044        cx.spawn(async move |this, cx| {
2045            let load = project.update(cx, |project, cx| {
2046                let path = project
2047                    .project_path_for_absolute_path(&path, cx)
2048                    .context("invalid path")?;
2049                anyhow::Ok(project.open_buffer(path, cx))
2050            });
2051            let buffer = load??.await?;
2052            let snapshot = this.update(cx, |this, cx| {
2053                this.shared_buffers
2054                    .get(&buffer)
2055                    .cloned()
2056                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2057            })?;
2058            let edits = cx
2059                .background_executor()
2060                .spawn(async move {
2061                    let old_text = snapshot.text();
2062                    text_diff(old_text.as_str(), &content)
2063                        .into_iter()
2064                        .map(|(range, replacement)| {
2065                            (
2066                                snapshot.anchor_after(range.start)
2067                                    ..snapshot.anchor_before(range.end),
2068                                replacement,
2069                            )
2070                        })
2071                        .collect::<Vec<_>>()
2072                })
2073                .await;
2074
2075            project.update(cx, |project, cx| {
2076                project.set_agent_location(
2077                    Some(AgentLocation {
2078                        buffer: buffer.downgrade(),
2079                        position: edits
2080                            .last()
2081                            .map(|(range, _)| range.end)
2082                            .unwrap_or(Anchor::MIN),
2083                    }),
2084                    cx,
2085                );
2086            })?;
2087
2088            let format_on_save = cx.update(|cx| {
2089                action_log.update(cx, |action_log, cx| {
2090                    action_log.buffer_read(buffer.clone(), cx);
2091                });
2092
2093                let format_on_save = buffer.update(cx, |buffer, cx| {
2094                    buffer.edit(edits, None, cx);
2095
2096                    let settings = language::language_settings::language_settings(
2097                        buffer.language().map(|l| l.name()),
2098                        buffer.file(),
2099                        cx,
2100                    );
2101
2102                    settings.format_on_save != FormatOnSave::Off
2103                });
2104                action_log.update(cx, |action_log, cx| {
2105                    action_log.buffer_edited(buffer.clone(), cx);
2106                });
2107                format_on_save
2108            })?;
2109
2110            if format_on_save {
2111                let format_task = project.update(cx, |project, cx| {
2112                    project.format(
2113                        HashSet::from_iter([buffer.clone()]),
2114                        LspFormatTarget::Buffers,
2115                        false,
2116                        FormatTrigger::Save,
2117                        cx,
2118                    )
2119                })?;
2120                format_task.await.log_err();
2121
2122                action_log.update(cx, |action_log, cx| {
2123                    action_log.buffer_edited(buffer.clone(), cx);
2124                })?;
2125            }
2126
2127            project
2128                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2129                .await
2130        })
2131    }
2132
2133    pub fn create_terminal(
2134        &self,
2135        command: String,
2136        args: Vec<String>,
2137        extra_env: Vec<acp::EnvVariable>,
2138        cwd: Option<PathBuf>,
2139        output_byte_limit: Option<u64>,
2140        cx: &mut Context<Self>,
2141    ) -> Task<Result<Entity<Terminal>>> {
2142        let env = match &cwd {
2143            Some(dir) => self.project.update(cx, |project, cx| {
2144                let worktree = project.find_worktree(dir.as_path(), cx);
2145                let shell = TerminalSettings::get(
2146                    worktree.as_ref().map(|(worktree, path)| SettingsLocation {
2147                        worktree_id: worktree.read(cx).id(),
2148                        path: &path,
2149                    }),
2150                    cx,
2151                )
2152                .shell
2153                .clone();
2154                project.directory_environment(&shell, dir.as_path().into(), cx)
2155            }),
2156            None => Task::ready(None).shared(),
2157        };
2158        let env = cx.spawn(async move |_, _| {
2159            let mut env = env.await.unwrap_or_default();
2160            // Disables paging for `git` and hopefully other commands
2161            env.insert("PAGER".into(), "".into());
2162            for var in extra_env {
2163                env.insert(var.name, var.value);
2164            }
2165            env
2166        });
2167
2168        let project = self.project.clone();
2169        let language_registry = project.read(cx).languages().clone();
2170        let is_windows = project.read(cx).path_style(cx).is_windows();
2171
2172        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2173        let terminal_task = cx.spawn({
2174            let terminal_id = terminal_id.clone();
2175            async move |_this, cx| {
2176                let env = env.await;
2177                let shell = project
2178                    .update(cx, |project, cx| {
2179                        project
2180                            .remote_client()
2181                            .and_then(|r| r.read(cx).default_system_shell())
2182                    })?
2183                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2184                let (task_command, task_args) =
2185                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2186                        .redirect_stdin_to_dev_null()
2187                        .build(Some(command.clone()), &args);
2188                let terminal = project
2189                    .update(cx, |project, cx| {
2190                        project.create_terminal_task(
2191                            task::SpawnInTerminal {
2192                                command: Some(task_command),
2193                                args: task_args,
2194                                cwd: cwd.clone(),
2195                                env,
2196                                ..Default::default()
2197                            },
2198                            cx,
2199                        )
2200                    })?
2201                    .await?;
2202
2203                cx.new(|cx| {
2204                    Terminal::new(
2205                        terminal_id,
2206                        &format!("{} {}", command, args.join(" ")),
2207                        cwd,
2208                        output_byte_limit.map(|l| l as usize),
2209                        terminal,
2210                        language_registry,
2211                        cx,
2212                    )
2213                })
2214            }
2215        });
2216
2217        cx.spawn(async move |this, cx| {
2218            let terminal = terminal_task.await?;
2219            this.update(cx, |this, _cx| {
2220                this.terminals.insert(terminal_id, terminal.clone());
2221                terminal
2222            })
2223        })
2224    }
2225
2226    pub fn kill_terminal(
2227        &mut self,
2228        terminal_id: acp::TerminalId,
2229        cx: &mut Context<Self>,
2230    ) -> Result<()> {
2231        self.terminals
2232            .get(&terminal_id)
2233            .context("Terminal not found")?
2234            .update(cx, |terminal, cx| {
2235                terminal.kill(cx);
2236            });
2237
2238        Ok(())
2239    }
2240
2241    pub fn release_terminal(
2242        &mut self,
2243        terminal_id: acp::TerminalId,
2244        cx: &mut Context<Self>,
2245    ) -> Result<()> {
2246        self.terminals
2247            .remove(&terminal_id)
2248            .context("Terminal not found")?
2249            .update(cx, |terminal, cx| {
2250                terminal.kill(cx);
2251            });
2252
2253        Ok(())
2254    }
2255
2256    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2257        self.terminals
2258            .get(&terminal_id)
2259            .context("Terminal not found")
2260            .cloned()
2261    }
2262
2263    pub fn to_markdown(&self, cx: &App) -> String {
2264        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2265    }
2266
2267    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2268        cx.emit(AcpThreadEvent::LoadError(error));
2269    }
2270
2271    pub fn register_terminal_created(
2272        &mut self,
2273        terminal_id: acp::TerminalId,
2274        command_label: String,
2275        working_dir: Option<PathBuf>,
2276        output_byte_limit: Option<u64>,
2277        terminal: Entity<::terminal::Terminal>,
2278        cx: &mut Context<Self>,
2279    ) -> Entity<Terminal> {
2280        let language_registry = self.project.read(cx).languages().clone();
2281
2282        let entity = cx.new(|cx| {
2283            Terminal::new(
2284                terminal_id.clone(),
2285                &command_label,
2286                working_dir.clone(),
2287                output_byte_limit.map(|l| l as usize),
2288                terminal,
2289                language_registry,
2290                cx,
2291            )
2292        });
2293        self.terminals.insert(terminal_id.clone(), entity.clone());
2294        entity
2295    }
2296}
2297
2298fn markdown_for_raw_output(
2299    raw_output: &serde_json::Value,
2300    language_registry: &Arc<LanguageRegistry>,
2301    cx: &mut App,
2302) -> Option<Entity<Markdown>> {
2303    match raw_output {
2304        serde_json::Value::Null => None,
2305        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2306            Markdown::new(
2307                value.to_string().into(),
2308                Some(language_registry.clone()),
2309                None,
2310                cx,
2311            )
2312        })),
2313        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2314            Markdown::new(
2315                value.to_string().into(),
2316                Some(language_registry.clone()),
2317                None,
2318                cx,
2319            )
2320        })),
2321        serde_json::Value::String(value) => Some(cx.new(|cx| {
2322            Markdown::new(
2323                value.clone().into(),
2324                Some(language_registry.clone()),
2325                None,
2326                cx,
2327            )
2328        })),
2329        value => Some(cx.new(|cx| {
2330            Markdown::new(
2331                format!("```json\n{}\n```", value).into(),
2332                Some(language_registry.clone()),
2333                None,
2334                cx,
2335            )
2336        })),
2337    }
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342    use super::*;
2343    use anyhow::anyhow;
2344    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2345    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2346    use indoc::indoc;
2347    use project::{FakeFs, Fs};
2348    use rand::{distr, prelude::*};
2349    use serde_json::json;
2350    use settings::SettingsStore;
2351    use smol::stream::StreamExt as _;
2352    use std::{
2353        any::Any,
2354        cell::RefCell,
2355        path::Path,
2356        rc::Rc,
2357        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2358        time::Duration,
2359    };
2360    use util::path;
2361
2362    fn init_test(cx: &mut TestAppContext) {
2363        env_logger::try_init().ok();
2364        cx.update(|cx| {
2365            let settings_store = SettingsStore::test(cx);
2366            cx.set_global(settings_store);
2367            Project::init_settings(cx);
2368            language::init(cx);
2369        });
2370    }
2371
2372    #[gpui::test]
2373    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2374        init_test(cx);
2375
2376        let fs = FakeFs::new(cx.executor());
2377        let project = Project::test(fs, [], cx).await;
2378        let connection = Rc::new(FakeAgentConnection::new());
2379        let thread = cx
2380            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2381            .await
2382            .unwrap();
2383
2384        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2385
2386        // Send Output BEFORE Created - should be buffered by acp_thread
2387        thread.update(cx, |thread, cx| {
2388            thread.on_terminal_provider_event(
2389                TerminalProviderEvent::Output {
2390                    terminal_id: terminal_id.clone(),
2391                    data: b"hello buffered".to_vec(),
2392                },
2393                cx,
2394            );
2395        });
2396
2397        // Create a display-only terminal and then send Created
2398        let lower = cx.new(|cx| {
2399            let builder = ::terminal::TerminalBuilder::new_display_only(
2400                ::terminal::terminal_settings::CursorShape::default(),
2401                ::terminal::terminal_settings::AlternateScroll::On,
2402                None,
2403                0,
2404            )
2405            .unwrap();
2406            builder.subscribe(cx)
2407        });
2408
2409        thread.update(cx, |thread, cx| {
2410            thread.on_terminal_provider_event(
2411                TerminalProviderEvent::Created {
2412                    terminal_id: terminal_id.clone(),
2413                    label: "Buffered Test".to_string(),
2414                    cwd: None,
2415                    output_byte_limit: None,
2416                    terminal: lower.clone(),
2417                },
2418                cx,
2419            );
2420        });
2421
2422        // After Created, buffered Output should have been flushed into the renderer
2423        let content = thread.read_with(cx, |thread, cx| {
2424            let term = thread.terminal(terminal_id.clone()).unwrap();
2425            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2426        });
2427
2428        assert!(
2429            content.contains("hello buffered"),
2430            "expected buffered output to render, got: {content}"
2431        );
2432    }
2433
2434    #[gpui::test]
2435    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2436        init_test(cx);
2437
2438        let fs = FakeFs::new(cx.executor());
2439        let project = Project::test(fs, [], cx).await;
2440        let connection = Rc::new(FakeAgentConnection::new());
2441        let thread = cx
2442            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2443            .await
2444            .unwrap();
2445
2446        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2447
2448        // Send Output BEFORE Created
2449        thread.update(cx, |thread, cx| {
2450            thread.on_terminal_provider_event(
2451                TerminalProviderEvent::Output {
2452                    terminal_id: terminal_id.clone(),
2453                    data: b"pre-exit data".to_vec(),
2454                },
2455                cx,
2456            );
2457        });
2458
2459        // Send Exit BEFORE Created
2460        thread.update(cx, |thread, cx| {
2461            thread.on_terminal_provider_event(
2462                TerminalProviderEvent::Exit {
2463                    terminal_id: terminal_id.clone(),
2464                    status: acp::TerminalExitStatus {
2465                        exit_code: Some(0),
2466                        signal: None,
2467                        meta: None,
2468                    },
2469                },
2470                cx,
2471            );
2472        });
2473
2474        // Now create a display-only lower-level terminal and send Created
2475        let lower = cx.new(|cx| {
2476            let builder = ::terminal::TerminalBuilder::new_display_only(
2477                ::terminal::terminal_settings::CursorShape::default(),
2478                ::terminal::terminal_settings::AlternateScroll::On,
2479                None,
2480                0,
2481            )
2482            .unwrap();
2483            builder.subscribe(cx)
2484        });
2485
2486        thread.update(cx, |thread, cx| {
2487            thread.on_terminal_provider_event(
2488                TerminalProviderEvent::Created {
2489                    terminal_id: terminal_id.clone(),
2490                    label: "Buffered Exit Test".to_string(),
2491                    cwd: None,
2492                    output_byte_limit: None,
2493                    terminal: lower.clone(),
2494                },
2495                cx,
2496            );
2497        });
2498
2499        // Output should be present after Created (flushed from buffer)
2500        let content = thread.read_with(cx, |thread, cx| {
2501            let term = thread.terminal(terminal_id.clone()).unwrap();
2502            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2503        });
2504
2505        assert!(
2506            content.contains("pre-exit data"),
2507            "expected pre-exit data to render, got: {content}"
2508        );
2509    }
2510
2511    #[gpui::test]
2512    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2513        init_test(cx);
2514
2515        let fs = FakeFs::new(cx.executor());
2516        let project = Project::test(fs, [], cx).await;
2517        let connection = Rc::new(FakeAgentConnection::new());
2518        let thread = cx
2519            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2520            .await
2521            .unwrap();
2522
2523        // Test creating a new user message
2524        thread.update(cx, |thread, cx| {
2525            thread.push_user_content_block(
2526                None,
2527                acp::ContentBlock::Text(acp::TextContent {
2528                    annotations: None,
2529                    text: "Hello, ".to_string(),
2530                    meta: None,
2531                }),
2532                cx,
2533            );
2534        });
2535
2536        thread.update(cx, |thread, cx| {
2537            assert_eq!(thread.entries.len(), 1);
2538            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2539                assert_eq!(user_msg.id, None);
2540                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2541            } else {
2542                panic!("Expected UserMessage");
2543            }
2544        });
2545
2546        // Test appending to existing user message
2547        let message_1_id = UserMessageId::new();
2548        thread.update(cx, |thread, cx| {
2549            thread.push_user_content_block(
2550                Some(message_1_id.clone()),
2551                acp::ContentBlock::Text(acp::TextContent {
2552                    annotations: None,
2553                    text: "world!".to_string(),
2554                    meta: None,
2555                }),
2556                cx,
2557            );
2558        });
2559
2560        thread.update(cx, |thread, cx| {
2561            assert_eq!(thread.entries.len(), 1);
2562            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2563                assert_eq!(user_msg.id, Some(message_1_id));
2564                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2565            } else {
2566                panic!("Expected UserMessage");
2567            }
2568        });
2569
2570        // Test creating new user message after assistant message
2571        thread.update(cx, |thread, cx| {
2572            thread.push_assistant_content_block(
2573                acp::ContentBlock::Text(acp::TextContent {
2574                    annotations: None,
2575                    text: "Assistant response".to_string(),
2576                    meta: None,
2577                }),
2578                false,
2579                cx,
2580            );
2581        });
2582
2583        let message_2_id = UserMessageId::new();
2584        thread.update(cx, |thread, cx| {
2585            thread.push_user_content_block(
2586                Some(message_2_id.clone()),
2587                acp::ContentBlock::Text(acp::TextContent {
2588                    annotations: None,
2589                    text: "New user message".to_string(),
2590                    meta: None,
2591                }),
2592                cx,
2593            );
2594        });
2595
2596        thread.update(cx, |thread, cx| {
2597            assert_eq!(thread.entries.len(), 3);
2598            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2599                assert_eq!(user_msg.id, Some(message_2_id));
2600                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2601            } else {
2602                panic!("Expected UserMessage at index 2");
2603            }
2604        });
2605    }
2606
2607    #[gpui::test]
2608    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2609        init_test(cx);
2610
2611        let fs = FakeFs::new(cx.executor());
2612        let project = Project::test(fs, [], cx).await;
2613        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2614            |_, thread, mut cx| {
2615                async move {
2616                    thread.update(&mut cx, |thread, cx| {
2617                        thread
2618                            .handle_session_update(
2619                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2620                                    content: "Thinking ".into(),
2621                                    meta: None,
2622                                }),
2623                                cx,
2624                            )
2625                            .unwrap();
2626                        thread
2627                            .handle_session_update(
2628                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2629                                    content: "hard!".into(),
2630                                    meta: None,
2631                                }),
2632                                cx,
2633                            )
2634                            .unwrap();
2635                    })?;
2636                    Ok(acp::PromptResponse {
2637                        stop_reason: acp::StopReason::EndTurn,
2638                        meta: None,
2639                    })
2640                }
2641                .boxed_local()
2642            },
2643        ));
2644
2645        let thread = cx
2646            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2647            .await
2648            .unwrap();
2649
2650        thread
2651            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2652            .await
2653            .unwrap();
2654
2655        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2656        assert_eq!(
2657            output,
2658            indoc! {r#"
2659            ## User
2660
2661            Hello from Zed!
2662
2663            ## Assistant
2664
2665            <thinking>
2666            Thinking hard!
2667            </thinking>
2668
2669            "#}
2670        );
2671    }
2672
2673    #[gpui::test]
2674    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2675        init_test(cx);
2676
2677        let fs = FakeFs::new(cx.executor());
2678        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2679            .await;
2680        let project = Project::test(fs.clone(), [], cx).await;
2681        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2682        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2683        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2684            move |_, thread, mut cx| {
2685                let read_file_tx = read_file_tx.clone();
2686                async move {
2687                    let content = thread
2688                        .update(&mut cx, |thread, cx| {
2689                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2690                        })
2691                        .unwrap()
2692                        .await
2693                        .unwrap();
2694                    assert_eq!(content, "one\ntwo\nthree\n");
2695                    read_file_tx.take().unwrap().send(()).unwrap();
2696                    thread
2697                        .update(&mut cx, |thread, cx| {
2698                            thread.write_text_file(
2699                                path!("/tmp/foo").into(),
2700                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2701                                cx,
2702                            )
2703                        })
2704                        .unwrap()
2705                        .await
2706                        .unwrap();
2707                    Ok(acp::PromptResponse {
2708                        stop_reason: acp::StopReason::EndTurn,
2709                        meta: None,
2710                    })
2711                }
2712                .boxed_local()
2713            },
2714        ));
2715
2716        let (worktree, pathbuf) = project
2717            .update(cx, |project, cx| {
2718                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2719            })
2720            .await
2721            .unwrap();
2722        let buffer = project
2723            .update(cx, |project, cx| {
2724                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2725            })
2726            .await
2727            .unwrap();
2728
2729        let thread = cx
2730            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2731            .await
2732            .unwrap();
2733
2734        let request = thread.update(cx, |thread, cx| {
2735            thread.send_raw("Extend the count in /tmp/foo", cx)
2736        });
2737        read_file_rx.await.ok();
2738        buffer.update(cx, |buffer, cx| {
2739            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2740        });
2741        cx.run_until_parked();
2742        assert_eq!(
2743            buffer.read_with(cx, |buffer, _| buffer.text()),
2744            "zero\none\ntwo\nthree\nfour\nfive\n"
2745        );
2746        assert_eq!(
2747            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2748            "zero\none\ntwo\nthree\nfour\nfive\n"
2749        );
2750        request.await.unwrap();
2751    }
2752
2753    #[gpui::test]
2754    async fn test_reading_from_line(cx: &mut TestAppContext) {
2755        init_test(cx);
2756
2757        let fs = FakeFs::new(cx.executor());
2758        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2759            .await;
2760        let project = Project::test(fs.clone(), [], cx).await;
2761        project
2762            .update(cx, |project, cx| {
2763                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2764            })
2765            .await
2766            .unwrap();
2767
2768        let connection = Rc::new(FakeAgentConnection::new());
2769
2770        let thread = cx
2771            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2772            .await
2773            .unwrap();
2774
2775        // Whole file
2776        let content = thread
2777            .update(cx, |thread, cx| {
2778                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2779            })
2780            .await
2781            .unwrap();
2782
2783        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2784
2785        // Only start line
2786        let content = thread
2787            .update(cx, |thread, cx| {
2788                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2789            })
2790            .await
2791            .unwrap();
2792
2793        assert_eq!(content, "three\nfour\n");
2794
2795        // Only limit
2796        let content = thread
2797            .update(cx, |thread, cx| {
2798                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2799            })
2800            .await
2801            .unwrap();
2802
2803        assert_eq!(content, "one\ntwo\n");
2804
2805        // Range
2806        let content = thread
2807            .update(cx, |thread, cx| {
2808                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2809            })
2810            .await
2811            .unwrap();
2812
2813        assert_eq!(content, "two\nthree\n");
2814
2815        // Invalid
2816        let err = thread
2817            .update(cx, |thread, cx| {
2818                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2819            })
2820            .await
2821            .unwrap_err();
2822
2823        assert_eq!(
2824            err.to_string(),
2825            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2826        );
2827    }
2828
2829    #[gpui::test]
2830    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2831        init_test(cx);
2832
2833        let fs = FakeFs::new(cx.executor());
2834        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2835        let project = Project::test(fs.clone(), [], cx).await;
2836        project
2837            .update(cx, |project, cx| {
2838                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2839            })
2840            .await
2841            .unwrap();
2842
2843        let connection = Rc::new(FakeAgentConnection::new());
2844
2845        let thread = cx
2846            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2847            .await
2848            .unwrap();
2849
2850        // Whole file
2851        let content = thread
2852            .update(cx, |thread, cx| {
2853                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2854            })
2855            .await
2856            .unwrap();
2857
2858        assert_eq!(content, "");
2859
2860        // Only start line
2861        let content = thread
2862            .update(cx, |thread, cx| {
2863                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2864            })
2865            .await
2866            .unwrap();
2867
2868        assert_eq!(content, "");
2869
2870        // Only limit
2871        let content = thread
2872            .update(cx, |thread, cx| {
2873                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2874            })
2875            .await
2876            .unwrap();
2877
2878        assert_eq!(content, "");
2879
2880        // Range
2881        let content = thread
2882            .update(cx, |thread, cx| {
2883                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2884            })
2885            .await
2886            .unwrap();
2887
2888        assert_eq!(content, "");
2889
2890        // Invalid
2891        let err = thread
2892            .update(cx, |thread, cx| {
2893                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2894            })
2895            .await
2896            .unwrap_err();
2897
2898        assert_eq!(
2899            err.to_string(),
2900            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2901        );
2902    }
2903    #[gpui::test]
2904    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2905        init_test(cx);
2906
2907        let fs = FakeFs::new(cx.executor());
2908        fs.insert_tree(path!("/tmp"), json!({})).await;
2909        let project = Project::test(fs.clone(), [], cx).await;
2910        project
2911            .update(cx, |project, cx| {
2912                project.find_or_create_worktree(path!("/tmp"), true, cx)
2913            })
2914            .await
2915            .unwrap();
2916
2917        let connection = Rc::new(FakeAgentConnection::new());
2918
2919        let thread = cx
2920            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2921            .await
2922            .unwrap();
2923
2924        // Out of project file
2925        let err = thread
2926            .update(cx, |thread, cx| {
2927                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2928            })
2929            .await
2930            .unwrap_err();
2931
2932        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2933    }
2934
2935    #[gpui::test]
2936    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2937        init_test(cx);
2938
2939        let fs = FakeFs::new(cx.executor());
2940        let project = Project::test(fs, [], cx).await;
2941        let id = acp::ToolCallId("test".into());
2942
2943        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2944            let id = id.clone();
2945            move |_, thread, mut cx| {
2946                let id = id.clone();
2947                async move {
2948                    thread
2949                        .update(&mut cx, |thread, cx| {
2950                            thread.handle_session_update(
2951                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2952                                    id: id.clone(),
2953                                    title: "Label".into(),
2954                                    kind: acp::ToolKind::Fetch,
2955                                    status: acp::ToolCallStatus::InProgress,
2956                                    content: vec![],
2957                                    locations: vec![],
2958                                    raw_input: None,
2959                                    raw_output: None,
2960                                    meta: None,
2961                                }),
2962                                cx,
2963                            )
2964                        })
2965                        .unwrap()
2966                        .unwrap();
2967                    Ok(acp::PromptResponse {
2968                        stop_reason: acp::StopReason::EndTurn,
2969                        meta: None,
2970                    })
2971                }
2972                .boxed_local()
2973            }
2974        }));
2975
2976        let thread = cx
2977            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2978            .await
2979            .unwrap();
2980
2981        let request = thread.update(cx, |thread, cx| {
2982            thread.send_raw("Fetch https://example.com", cx)
2983        });
2984
2985        run_until_first_tool_call(&thread, cx).await;
2986
2987        thread.read_with(cx, |thread, _| {
2988            assert!(matches!(
2989                thread.entries[1],
2990                AgentThreadEntry::ToolCall(ToolCall {
2991                    status: ToolCallStatus::InProgress,
2992                    ..
2993                })
2994            ));
2995        });
2996
2997        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2998
2999        thread.read_with(cx, |thread, _| {
3000            assert!(matches!(
3001                &thread.entries[1],
3002                AgentThreadEntry::ToolCall(ToolCall {
3003                    status: ToolCallStatus::Canceled,
3004                    ..
3005                })
3006            ));
3007        });
3008
3009        thread
3010            .update(cx, |thread, cx| {
3011                thread.handle_session_update(
3012                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3013                        id,
3014                        fields: acp::ToolCallUpdateFields {
3015                            status: Some(acp::ToolCallStatus::Completed),
3016                            ..Default::default()
3017                        },
3018                        meta: None,
3019                    }),
3020                    cx,
3021                )
3022            })
3023            .unwrap();
3024
3025        request.await.unwrap();
3026
3027        thread.read_with(cx, |thread, _| {
3028            assert!(matches!(
3029                thread.entries[1],
3030                AgentThreadEntry::ToolCall(ToolCall {
3031                    status: ToolCallStatus::Completed,
3032                    ..
3033                })
3034            ));
3035        });
3036    }
3037
3038    #[gpui::test]
3039    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3040        init_test(cx);
3041        let fs = FakeFs::new(cx.background_executor.clone());
3042        fs.insert_tree(path!("/test"), json!({})).await;
3043        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3044
3045        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3046            move |_, thread, mut cx| {
3047                async move {
3048                    thread
3049                        .update(&mut cx, |thread, cx| {
3050                            thread.handle_session_update(
3051                                acp::SessionUpdate::ToolCall(acp::ToolCall {
3052                                    id: acp::ToolCallId("test".into()),
3053                                    title: "Label".into(),
3054                                    kind: acp::ToolKind::Edit,
3055                                    status: acp::ToolCallStatus::Completed,
3056                                    content: vec![acp::ToolCallContent::Diff {
3057                                        diff: acp::Diff {
3058                                            path: "/test/test.txt".into(),
3059                                            old_text: None,
3060                                            new_text: "foo".into(),
3061                                            meta: None,
3062                                        },
3063                                    }],
3064                                    locations: vec![],
3065                                    raw_input: None,
3066                                    raw_output: None,
3067                                    meta: None,
3068                                }),
3069                                cx,
3070                            )
3071                        })
3072                        .unwrap()
3073                        .unwrap();
3074                    Ok(acp::PromptResponse {
3075                        stop_reason: acp::StopReason::EndTurn,
3076                        meta: None,
3077                    })
3078                }
3079                .boxed_local()
3080            }
3081        }));
3082
3083        let thread = cx
3084            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3085            .await
3086            .unwrap();
3087
3088        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3089            .await
3090            .unwrap();
3091
3092        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3093    }
3094
3095    #[gpui::test(iterations = 10)]
3096    async fn test_checkpoints(cx: &mut TestAppContext) {
3097        init_test(cx);
3098        let fs = FakeFs::new(cx.background_executor.clone());
3099        fs.insert_tree(
3100            path!("/test"),
3101            json!({
3102                ".git": {}
3103            }),
3104        )
3105        .await;
3106        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3107
3108        let simulate_changes = Arc::new(AtomicBool::new(true));
3109        let next_filename = Arc::new(AtomicUsize::new(0));
3110        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3111            let simulate_changes = simulate_changes.clone();
3112            let next_filename = next_filename.clone();
3113            let fs = fs.clone();
3114            move |request, thread, mut cx| {
3115                let fs = fs.clone();
3116                let simulate_changes = simulate_changes.clone();
3117                let next_filename = next_filename.clone();
3118                async move {
3119                    if simulate_changes.load(SeqCst) {
3120                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3121                        fs.write(Path::new(&filename), b"").await?;
3122                    }
3123
3124                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3125                        panic!("expected text content block");
3126                    };
3127                    thread.update(&mut cx, |thread, cx| {
3128                        thread
3129                            .handle_session_update(
3130                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3131                                    content: content.text.to_uppercase().into(),
3132                                    meta: None,
3133                                }),
3134                                cx,
3135                            )
3136                            .unwrap();
3137                    })?;
3138                    Ok(acp::PromptResponse {
3139                        stop_reason: acp::StopReason::EndTurn,
3140                        meta: None,
3141                    })
3142                }
3143                .boxed_local()
3144            }
3145        }));
3146        let thread = cx
3147            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3148            .await
3149            .unwrap();
3150
3151        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3152            .await
3153            .unwrap();
3154        thread.read_with(cx, |thread, cx| {
3155            assert_eq!(
3156                thread.to_markdown(cx),
3157                indoc! {"
3158                    ## User (checkpoint)
3159
3160                    Lorem
3161
3162                    ## Assistant
3163
3164                    LOREM
3165
3166                "}
3167            );
3168        });
3169        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3170
3171        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3172            .await
3173            .unwrap();
3174        thread.read_with(cx, |thread, cx| {
3175            assert_eq!(
3176                thread.to_markdown(cx),
3177                indoc! {"
3178                    ## User (checkpoint)
3179
3180                    Lorem
3181
3182                    ## Assistant
3183
3184                    LOREM
3185
3186                    ## User (checkpoint)
3187
3188                    ipsum
3189
3190                    ## Assistant
3191
3192                    IPSUM
3193
3194                "}
3195            );
3196        });
3197        assert_eq!(
3198            fs.files(),
3199            vec![
3200                Path::new(path!("/test/file-0")),
3201                Path::new(path!("/test/file-1"))
3202            ]
3203        );
3204
3205        // Checkpoint isn't stored when there are no changes.
3206        simulate_changes.store(false, SeqCst);
3207        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3208            .await
3209            .unwrap();
3210        thread.read_with(cx, |thread, cx| {
3211            assert_eq!(
3212                thread.to_markdown(cx),
3213                indoc! {"
3214                    ## User (checkpoint)
3215
3216                    Lorem
3217
3218                    ## Assistant
3219
3220                    LOREM
3221
3222                    ## User (checkpoint)
3223
3224                    ipsum
3225
3226                    ## Assistant
3227
3228                    IPSUM
3229
3230                    ## User
3231
3232                    dolor
3233
3234                    ## Assistant
3235
3236                    DOLOR
3237
3238                "}
3239            );
3240        });
3241        assert_eq!(
3242            fs.files(),
3243            vec![
3244                Path::new(path!("/test/file-0")),
3245                Path::new(path!("/test/file-1"))
3246            ]
3247        );
3248
3249        // Rewinding the conversation truncates the history and restores the checkpoint.
3250        thread
3251            .update(cx, |thread, cx| {
3252                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3253                    panic!("unexpected entries {:?}", thread.entries)
3254                };
3255                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3256            })
3257            .await
3258            .unwrap();
3259        thread.read_with(cx, |thread, cx| {
3260            assert_eq!(
3261                thread.to_markdown(cx),
3262                indoc! {"
3263                    ## User (checkpoint)
3264
3265                    Lorem
3266
3267                    ## Assistant
3268
3269                    LOREM
3270
3271                "}
3272            );
3273        });
3274        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3275    }
3276
3277    #[gpui::test]
3278    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3279        use std::sync::atomic::AtomicUsize;
3280        init_test(cx);
3281
3282        let fs = FakeFs::new(cx.executor());
3283        let project = Project::test(fs, None, cx).await;
3284
3285        // Create a connection that simulates refusal after tool result
3286        let prompt_count = Arc::new(AtomicUsize::new(0));
3287        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3288            let prompt_count = prompt_count.clone();
3289            move |_request, thread, mut cx| {
3290                let count = prompt_count.fetch_add(1, SeqCst);
3291                async move {
3292                    if count == 0 {
3293                        // First prompt: Generate a tool call with result
3294                        thread.update(&mut cx, |thread, cx| {
3295                            thread
3296                                .handle_session_update(
3297                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3298                                        id: acp::ToolCallId("tool1".into()),
3299                                        title: "Test Tool".into(),
3300                                        kind: acp::ToolKind::Fetch,
3301                                        status: acp::ToolCallStatus::Completed,
3302                                        content: vec![],
3303                                        locations: vec![],
3304                                        raw_input: Some(serde_json::json!({"query": "test"})),
3305                                        raw_output: Some(
3306                                            serde_json::json!({"result": "inappropriate content"}),
3307                                        ),
3308                                        meta: None,
3309                                    }),
3310                                    cx,
3311                                )
3312                                .unwrap();
3313                        })?;
3314
3315                        // Now return refusal because of the tool result
3316                        Ok(acp::PromptResponse {
3317                            stop_reason: acp::StopReason::Refusal,
3318                            meta: None,
3319                        })
3320                    } else {
3321                        Ok(acp::PromptResponse {
3322                            stop_reason: acp::StopReason::EndTurn,
3323                            meta: None,
3324                        })
3325                    }
3326                }
3327                .boxed_local()
3328            }
3329        }));
3330
3331        let thread = cx
3332            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3333            .await
3334            .unwrap();
3335
3336        // Track if we see a Refusal event
3337        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3338        let saw_refusal_event_captured = saw_refusal_event.clone();
3339        thread.update(cx, |_thread, cx| {
3340            cx.subscribe(
3341                &thread,
3342                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3343                    if matches!(event, AcpThreadEvent::Refusal) {
3344                        *saw_refusal_event_captured.lock().unwrap() = true;
3345                    }
3346                },
3347            )
3348            .detach();
3349        });
3350
3351        // Send a user message - this will trigger tool call and then refusal
3352        let send_task = thread.update(cx, |thread, cx| {
3353            thread.send(
3354                vec![acp::ContentBlock::Text(acp::TextContent {
3355                    text: "Hello".into(),
3356                    annotations: None,
3357                    meta: None,
3358                })],
3359                cx,
3360            )
3361        });
3362        cx.background_executor.spawn(send_task).detach();
3363        cx.run_until_parked();
3364
3365        // Verify that:
3366        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3367        // 2. The user message was NOT truncated
3368        assert!(
3369            *saw_refusal_event.lock().unwrap(),
3370            "Refusal event should be emitted for tool result refusals"
3371        );
3372
3373        thread.read_with(cx, |thread, _| {
3374            let entries = thread.entries();
3375            assert!(entries.len() >= 2, "Should have user message and tool call");
3376
3377            // Verify user message is still there
3378            assert!(
3379                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3380                "User message should not be truncated"
3381            );
3382
3383            // Verify tool call is there with result
3384            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3385                assert!(
3386                    tool_call.raw_output.is_some(),
3387                    "Tool call should have output"
3388                );
3389            } else {
3390                panic!("Expected tool call at index 1");
3391            }
3392        });
3393    }
3394
3395    #[gpui::test]
3396    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3397        init_test(cx);
3398
3399        let fs = FakeFs::new(cx.executor());
3400        let project = Project::test(fs, None, cx).await;
3401
3402        let refuse_next = Arc::new(AtomicBool::new(false));
3403        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3404            let refuse_next = refuse_next.clone();
3405            move |_request, _thread, _cx| {
3406                if refuse_next.load(SeqCst) {
3407                    async move {
3408                        Ok(acp::PromptResponse {
3409                            stop_reason: acp::StopReason::Refusal,
3410                            meta: None,
3411                        })
3412                    }
3413                    .boxed_local()
3414                } else {
3415                    async move {
3416                        Ok(acp::PromptResponse {
3417                            stop_reason: acp::StopReason::EndTurn,
3418                            meta: None,
3419                        })
3420                    }
3421                    .boxed_local()
3422                }
3423            }
3424        }));
3425
3426        let thread = cx
3427            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3428            .await
3429            .unwrap();
3430
3431        // Track if we see a Refusal event
3432        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3433        let saw_refusal_event_captured = saw_refusal_event.clone();
3434        thread.update(cx, |_thread, cx| {
3435            cx.subscribe(
3436                &thread,
3437                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3438                    if matches!(event, AcpThreadEvent::Refusal) {
3439                        *saw_refusal_event_captured.lock().unwrap() = true;
3440                    }
3441                },
3442            )
3443            .detach();
3444        });
3445
3446        // Send a message that will be refused
3447        refuse_next.store(true, SeqCst);
3448        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3449            .await
3450            .unwrap();
3451
3452        // Verify that a Refusal event WAS emitted for user prompt refusal
3453        assert!(
3454            *saw_refusal_event.lock().unwrap(),
3455            "Refusal event should be emitted for user prompt refusals"
3456        );
3457
3458        // Verify the message was truncated (user prompt refusal)
3459        thread.read_with(cx, |thread, cx| {
3460            assert_eq!(thread.to_markdown(cx), "");
3461        });
3462    }
3463
3464    #[gpui::test]
3465    async fn test_refusal(cx: &mut TestAppContext) {
3466        init_test(cx);
3467        let fs = FakeFs::new(cx.background_executor.clone());
3468        fs.insert_tree(path!("/"), json!({})).await;
3469        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3470
3471        let refuse_next = Arc::new(AtomicBool::new(false));
3472        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3473            let refuse_next = refuse_next.clone();
3474            move |request, thread, mut cx| {
3475                let refuse_next = refuse_next.clone();
3476                async move {
3477                    if refuse_next.load(SeqCst) {
3478                        return Ok(acp::PromptResponse {
3479                            stop_reason: acp::StopReason::Refusal,
3480                            meta: None,
3481                        });
3482                    }
3483
3484                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3485                        panic!("expected text content block");
3486                    };
3487                    thread.update(&mut cx, |thread, cx| {
3488                        thread
3489                            .handle_session_update(
3490                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3491                                    content: content.text.to_uppercase().into(),
3492                                    meta: None,
3493                                }),
3494                                cx,
3495                            )
3496                            .unwrap();
3497                    })?;
3498                    Ok(acp::PromptResponse {
3499                        stop_reason: acp::StopReason::EndTurn,
3500                        meta: None,
3501                    })
3502                }
3503                .boxed_local()
3504            }
3505        }));
3506        let thread = cx
3507            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3508            .await
3509            .unwrap();
3510
3511        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3512            .await
3513            .unwrap();
3514        thread.read_with(cx, |thread, cx| {
3515            assert_eq!(
3516                thread.to_markdown(cx),
3517                indoc! {"
3518                    ## User
3519
3520                    hello
3521
3522                    ## Assistant
3523
3524                    HELLO
3525
3526                "}
3527            );
3528        });
3529
3530        // Simulate refusing the second message. The message should be truncated
3531        // when a user prompt is refused.
3532        refuse_next.store(true, SeqCst);
3533        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3534            .await
3535            .unwrap();
3536        thread.read_with(cx, |thread, cx| {
3537            assert_eq!(
3538                thread.to_markdown(cx),
3539                indoc! {"
3540                    ## User
3541
3542                    hello
3543
3544                    ## Assistant
3545
3546                    HELLO
3547
3548                "}
3549            );
3550        });
3551    }
3552
3553    async fn run_until_first_tool_call(
3554        thread: &Entity<AcpThread>,
3555        cx: &mut TestAppContext,
3556    ) -> usize {
3557        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3558
3559        let subscription = cx.update(|cx| {
3560            cx.subscribe(thread, move |thread, _, cx| {
3561                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3562                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3563                        return tx.try_send(ix).unwrap();
3564                    }
3565                }
3566            })
3567        });
3568
3569        select! {
3570            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3571                panic!("Timeout waiting for tool call")
3572            }
3573            ix = rx.next().fuse() => {
3574                drop(subscription);
3575                ix.unwrap()
3576            }
3577        }
3578    }
3579
3580    #[derive(Clone, Default)]
3581    struct FakeAgentConnection {
3582        auth_methods: Vec<acp::AuthMethod>,
3583        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3584        on_user_message: Option<
3585            Rc<
3586                dyn Fn(
3587                        acp::PromptRequest,
3588                        WeakEntity<AcpThread>,
3589                        AsyncApp,
3590                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3591                    + 'static,
3592            >,
3593        >,
3594    }
3595
3596    impl FakeAgentConnection {
3597        fn new() -> Self {
3598            Self {
3599                auth_methods: Vec::new(),
3600                on_user_message: None,
3601                sessions: Arc::default(),
3602            }
3603        }
3604
3605        #[expect(unused)]
3606        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3607            self.auth_methods = auth_methods;
3608            self
3609        }
3610
3611        fn on_user_message(
3612            mut self,
3613            handler: impl Fn(
3614                acp::PromptRequest,
3615                WeakEntity<AcpThread>,
3616                AsyncApp,
3617            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3618            + 'static,
3619        ) -> Self {
3620            self.on_user_message.replace(Rc::new(handler));
3621            self
3622        }
3623    }
3624
3625    impl AgentConnection for FakeAgentConnection {
3626        fn auth_methods(&self) -> &[acp::AuthMethod] {
3627            &self.auth_methods
3628        }
3629
3630        fn new_thread(
3631            self: Rc<Self>,
3632            project: Entity<Project>,
3633            _cwd: &Path,
3634            cx: &mut App,
3635        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3636            let session_id = acp::SessionId(
3637                rand::rng()
3638                    .sample_iter(&distr::Alphanumeric)
3639                    .take(7)
3640                    .map(char::from)
3641                    .collect::<String>()
3642                    .into(),
3643            );
3644            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3645            let thread = cx.new(|cx| {
3646                AcpThread::new(
3647                    "Test",
3648                    self.clone(),
3649                    project,
3650                    action_log,
3651                    session_id.clone(),
3652                    watch::Receiver::constant(acp::PromptCapabilities {
3653                        image: true,
3654                        audio: true,
3655                        embedded_context: true,
3656                        meta: None,
3657                    }),
3658                    cx,
3659                )
3660            });
3661            self.sessions.lock().insert(session_id, thread.downgrade());
3662            Task::ready(Ok(thread))
3663        }
3664
3665        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3666            if self.auth_methods().iter().any(|m| m.id == method) {
3667                Task::ready(Ok(()))
3668            } else {
3669                Task::ready(Err(anyhow!("Invalid Auth Method")))
3670            }
3671        }
3672
3673        fn prompt(
3674            &self,
3675            _id: Option<UserMessageId>,
3676            params: acp::PromptRequest,
3677            cx: &mut App,
3678        ) -> Task<gpui::Result<acp::PromptResponse>> {
3679            let sessions = self.sessions.lock();
3680            let thread = sessions.get(¶ms.session_id).unwrap();
3681            if let Some(handler) = &self.on_user_message {
3682                let handler = handler.clone();
3683                let thread = thread.clone();
3684                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3685            } else {
3686                Task::ready(Ok(acp::PromptResponse {
3687                    stop_reason: acp::StopReason::EndTurn,
3688                    meta: None,
3689                }))
3690            }
3691        }
3692
3693        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3694            let sessions = self.sessions.lock();
3695            let thread = sessions.get(session_id).unwrap().clone();
3696
3697            cx.spawn(async move |cx| {
3698                thread
3699                    .update(cx, |thread, cx| thread.cancel(cx))
3700                    .unwrap()
3701                    .await
3702            })
3703            .detach();
3704        }
3705
3706        fn truncate(
3707            &self,
3708            session_id: &acp::SessionId,
3709            _cx: &App,
3710        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3711            Some(Rc::new(FakeAgentSessionEditor {
3712                _session_id: session_id.clone(),
3713            }))
3714        }
3715
3716        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3717            self
3718        }
3719    }
3720
3721    struct FakeAgentSessionEditor {
3722        _session_id: acp::SessionId,
3723    }
3724
3725    impl AgentSessionTruncate for FakeAgentSessionEditor {
3726        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3727            Task::ready(Ok(()))
3728        }
3729    }
3730
3731    #[gpui::test]
3732    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3733        init_test(cx);
3734
3735        let fs = FakeFs::new(cx.executor());
3736        let project = Project::test(fs, [], cx).await;
3737        let connection = Rc::new(FakeAgentConnection::new());
3738        let thread = cx
3739            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3740            .await
3741            .unwrap();
3742
3743        // Try to update a tool call that doesn't exist
3744        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3745        thread.update(cx, |thread, cx| {
3746            let result = thread.handle_session_update(
3747                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3748                    id: nonexistent_id.clone(),
3749                    fields: acp::ToolCallUpdateFields {
3750                        status: Some(acp::ToolCallStatus::Completed),
3751                        ..Default::default()
3752                    },
3753                    meta: None,
3754                }),
3755                cx,
3756            );
3757
3758            // The update should succeed (not return an error)
3759            assert!(result.is_ok());
3760
3761            // There should now be exactly one entry in the thread
3762            assert_eq!(thread.entries.len(), 1);
3763
3764            // The entry should be a failed tool call
3765            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3766                assert_eq!(tool_call.id, nonexistent_id);
3767                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3768                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3769
3770                // Check that the content contains the error message
3771                assert_eq!(tool_call.content.len(), 1);
3772                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3773                    match content_block {
3774                        ContentBlock::Markdown { markdown } => {
3775                            let markdown_text = markdown.read(cx).source();
3776                            assert!(markdown_text.contains("Tool call not found"));
3777                        }
3778                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3779                        ContentBlock::ResourceLink { .. } => {
3780                            panic!("Expected markdown content, got resource link")
3781                        }
3782                    }
3783                } else {
3784                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3785                }
3786            } else {
3787                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3788            }
3789        });
3790    }
3791}