acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use ::terminal::terminal_settings::TerminalSettings;
   7use agent_settings::AgentSettings;
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use language::language_settings::FormatOnSave;
  12pub use mention::*;
  13use project::lsp_store::{FormatTrigger, LspFormatTarget};
  14use serde::{Deserialize, Serialize};
  15use settings::{Settings as _, SettingsLocation};
  16use task::{Shell, ShellBuilder};
  17pub use terminal::*;
  18
  19use action_log::ActionLog;
  20use agent_client_protocol::{self as acp};
  21use anyhow::{Context as _, Result, anyhow};
  22use editor::Bias;
  23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  25use itertools::Itertools;
  26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  27use markdown::Markdown;
  28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  29use std::collections::HashMap;
  30use std::error::Error;
  31use std::fmt::{Formatter, Write};
  32use std::ops::Range;
  33use std::process::ExitStatus;
  34use std::rc::Rc;
  35use std::time::{Duration, Instant};
  36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  37use ui::App;
  38use util::{ResultExt, get_default_system_shell_preferring_bash};
  39use uuid::Uuid;
  40
  41#[derive(Debug)]
  42pub struct UserMessage {
  43    pub id: Option<UserMessageId>,
  44    pub content: ContentBlock,
  45    pub chunks: Vec<acp::ContentBlock>,
  46    pub checkpoint: Option<Checkpoint>,
  47}
  48
  49#[derive(Debug)]
  50pub struct Checkpoint {
  51    git_checkpoint: GitStoreCheckpoint,
  52    pub show: bool,
  53}
  54
  55impl UserMessage {
  56    fn to_markdown(&self, cx: &App) -> String {
  57        let mut markdown = String::new();
  58        if self
  59            .checkpoint
  60            .as_ref()
  61            .is_some_and(|checkpoint| checkpoint.show)
  62        {
  63            writeln!(markdown, "## User (checkpoint)").unwrap();
  64        } else {
  65            writeln!(markdown, "## User").unwrap();
  66        }
  67        writeln!(markdown).unwrap();
  68        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  69        writeln!(markdown).unwrap();
  70        markdown
  71    }
  72}
  73
  74#[derive(Debug, PartialEq)]
  75pub struct AssistantMessage {
  76    pub chunks: Vec<AssistantMessageChunk>,
  77}
  78
  79impl AssistantMessage {
  80    pub fn to_markdown(&self, cx: &App) -> String {
  81        format!(
  82            "## Assistant\n\n{}\n\n",
  83            self.chunks
  84                .iter()
  85                .map(|chunk| chunk.to_markdown(cx))
  86                .join("\n\n")
  87        )
  88    }
  89}
  90
  91#[derive(Debug, PartialEq)]
  92pub enum AssistantMessageChunk {
  93    Message { block: ContentBlock },
  94    Thought { block: ContentBlock },
  95}
  96
  97impl AssistantMessageChunk {
  98    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  99        Self::Message {
 100            block: ContentBlock::new(chunk.into(), language_registry, cx),
 101        }
 102    }
 103
 104    fn to_markdown(&self, cx: &App) -> String {
 105        match self {
 106            Self::Message { block } => block.to_markdown(cx).to_string(),
 107            Self::Thought { block } => {
 108                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 109            }
 110        }
 111    }
 112}
 113
 114#[derive(Debug)]
 115pub enum AgentThreadEntry {
 116    UserMessage(UserMessage),
 117    AssistantMessage(AssistantMessage),
 118    ToolCall(ToolCall),
 119}
 120
 121impl AgentThreadEntry {
 122    pub fn to_markdown(&self, cx: &App) -> String {
 123        match self {
 124            Self::UserMessage(message) => message.to_markdown(cx),
 125            Self::AssistantMessage(message) => message.to_markdown(cx),
 126            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 127        }
 128    }
 129
 130    pub fn user_message(&self) -> Option<&UserMessage> {
 131        if let AgentThreadEntry::UserMessage(message) = self {
 132            Some(message)
 133        } else {
 134            None
 135        }
 136    }
 137
 138    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 139        if let AgentThreadEntry::ToolCall(call) = self {
 140            itertools::Either::Left(call.diffs())
 141        } else {
 142            itertools::Either::Right(std::iter::empty())
 143        }
 144    }
 145
 146    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 147        if let AgentThreadEntry::ToolCall(call) = self {
 148            itertools::Either::Left(call.terminals())
 149        } else {
 150            itertools::Either::Right(std::iter::empty())
 151        }
 152    }
 153
 154    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 155        if let AgentThreadEntry::ToolCall(ToolCall {
 156            locations,
 157            resolved_locations,
 158            ..
 159        }) = self
 160        {
 161            Some((
 162                locations.get(ix)?.clone(),
 163                resolved_locations.get(ix)?.clone()?,
 164            ))
 165        } else {
 166            None
 167        }
 168    }
 169}
 170
 171#[derive(Debug)]
 172pub struct ToolCall {
 173    pub id: acp::ToolCallId,
 174    pub label: Entity<Markdown>,
 175    pub kind: acp::ToolKind,
 176    pub content: Vec<ToolCallContent>,
 177    pub status: ToolCallStatus,
 178    pub locations: Vec<acp::ToolCallLocation>,
 179    pub resolved_locations: Vec<Option<AgentLocation>>,
 180    pub raw_input: Option<serde_json::Value>,
 181    pub raw_output: Option<serde_json::Value>,
 182}
 183
 184impl ToolCall {
 185    fn from_acp(
 186        tool_call: acp::ToolCall,
 187        status: ToolCallStatus,
 188        language_registry: Arc<LanguageRegistry>,
 189        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 190        cx: &mut App,
 191    ) -> Result<Self> {
 192        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 193            first_line.to_owned() + ""
 194        } else {
 195            tool_call.title
 196        };
 197        let mut content = Vec::with_capacity(tool_call.content.len());
 198        for item in tool_call.content {
 199            content.push(ToolCallContent::from_acp(
 200                item,
 201                language_registry.clone(),
 202                terminals,
 203                cx,
 204            )?);
 205        }
 206
 207        let result = Self {
 208            id: tool_call.id,
 209            label: cx
 210                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 211            kind: tool_call.kind,
 212            content,
 213            locations: tool_call.locations,
 214            resolved_locations: Vec::default(),
 215            status,
 216            raw_input: tool_call.raw_input,
 217            raw_output: tool_call.raw_output,
 218        };
 219        Ok(result)
 220    }
 221
 222    fn update_fields(
 223        &mut self,
 224        fields: acp::ToolCallUpdateFields,
 225        language_registry: Arc<LanguageRegistry>,
 226        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 227        cx: &mut App,
 228    ) -> Result<()> {
 229        let acp::ToolCallUpdateFields {
 230            kind,
 231            status,
 232            title,
 233            content,
 234            locations,
 235            raw_input,
 236            raw_output,
 237        } = fields;
 238
 239        if let Some(kind) = kind {
 240            self.kind = kind;
 241        }
 242
 243        if let Some(status) = status {
 244            self.status = status.into();
 245        }
 246
 247        if let Some(title) = title {
 248            self.label.update(cx, |label, cx| {
 249                if let Some((first_line, _)) = title.split_once("\n") {
 250                    label.replace(first_line.to_owned() + "", cx)
 251                } else {
 252                    label.replace(title, cx);
 253                }
 254            });
 255        }
 256
 257        if let Some(content) = content {
 258            let new_content_len = content.len();
 259            let mut content = content.into_iter();
 260
 261            // Reuse existing content if we can
 262            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 263                old.update_from_acp(new, language_registry.clone(), terminals, cx)?;
 264            }
 265            for new in content {
 266                self.content.push(ToolCallContent::from_acp(
 267                    new,
 268                    language_registry.clone(),
 269                    terminals,
 270                    cx,
 271                )?)
 272            }
 273            self.content.truncate(new_content_len);
 274        }
 275
 276        if let Some(locations) = locations {
 277            self.locations = locations;
 278        }
 279
 280        if let Some(raw_input) = raw_input {
 281            self.raw_input = Some(raw_input);
 282        }
 283
 284        if let Some(raw_output) = raw_output {
 285            if self.content.is_empty()
 286                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 287            {
 288                self.content
 289                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 290                        markdown,
 291                    }));
 292            }
 293            self.raw_output = Some(raw_output);
 294        }
 295        Ok(())
 296    }
 297
 298    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 299        self.content.iter().filter_map(|content| match content {
 300            ToolCallContent::Diff(diff) => Some(diff),
 301            ToolCallContent::ContentBlock(_) => None,
 302            ToolCallContent::Terminal(_) => None,
 303        })
 304    }
 305
 306    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 307        self.content.iter().filter_map(|content| match content {
 308            ToolCallContent::Terminal(terminal) => Some(terminal),
 309            ToolCallContent::ContentBlock(_) => None,
 310            ToolCallContent::Diff(_) => None,
 311        })
 312    }
 313
 314    fn to_markdown(&self, cx: &App) -> String {
 315        let mut markdown = format!(
 316            "**Tool Call: {}**\nStatus: {}\n\n",
 317            self.label.read(cx).source(),
 318            self.status
 319        );
 320        for content in &self.content {
 321            markdown.push_str(content.to_markdown(cx).as_str());
 322            markdown.push_str("\n\n");
 323        }
 324        markdown
 325    }
 326
 327    async fn resolve_location(
 328        location: acp::ToolCallLocation,
 329        project: WeakEntity<Project>,
 330        cx: &mut AsyncApp,
 331    ) -> Option<ResolvedLocation> {
 332        let buffer = project
 333            .update(cx, |project, cx| {
 334                project
 335                    .project_path_for_absolute_path(&location.path, cx)
 336                    .map(|path| project.open_buffer(path, cx))
 337            })
 338            .ok()??;
 339        let buffer = buffer.await.log_err()?;
 340        let position = buffer
 341            .update(cx, |buffer, _| {
 342                if let Some(row) = location.line {
 343                    let snapshot = buffer.snapshot();
 344                    let column = snapshot.indent_size_for_line(row).len;
 345                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 346                    snapshot.anchor_before(point)
 347                } else {
 348                    Anchor::MIN
 349                }
 350            })
 351            .ok()?;
 352
 353        Some(ResolvedLocation { buffer, position })
 354    }
 355
 356    fn resolve_locations(
 357        &self,
 358        project: Entity<Project>,
 359        cx: &mut App,
 360    ) -> Task<Vec<Option<ResolvedLocation>>> {
 361        let locations = self.locations.clone();
 362        project.update(cx, |_, cx| {
 363            cx.spawn(async move |project, cx| {
 364                let mut new_locations = Vec::new();
 365                for location in locations {
 366                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 367                }
 368                new_locations
 369            })
 370        })
 371    }
 372}
 373
 374// Separate so we can hold a strong reference to the buffer
 375// for saving on the thread
 376#[derive(Clone, Debug, PartialEq, Eq)]
 377struct ResolvedLocation {
 378    buffer: Entity<Buffer>,
 379    position: Anchor,
 380}
 381
 382impl From<&ResolvedLocation> for AgentLocation {
 383    fn from(value: &ResolvedLocation) -> Self {
 384        Self {
 385            buffer: value.buffer.downgrade(),
 386            position: value.position,
 387        }
 388    }
 389}
 390
 391#[derive(Debug)]
 392pub enum ToolCallStatus {
 393    /// The tool call hasn't started running yet, but we start showing it to
 394    /// the user.
 395    Pending,
 396    /// The tool call is waiting for confirmation from the user.
 397    WaitingForConfirmation {
 398        options: Vec<acp::PermissionOption>,
 399        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 400    },
 401    /// The tool call is currently running.
 402    InProgress,
 403    /// The tool call completed successfully.
 404    Completed,
 405    /// The tool call failed.
 406    Failed,
 407    /// The user rejected the tool call.
 408    Rejected,
 409    /// The user canceled generation so the tool call was canceled.
 410    Canceled,
 411}
 412
 413impl From<acp::ToolCallStatus> for ToolCallStatus {
 414    fn from(status: acp::ToolCallStatus) -> Self {
 415        match status {
 416            acp::ToolCallStatus::Pending => Self::Pending,
 417            acp::ToolCallStatus::InProgress => Self::InProgress,
 418            acp::ToolCallStatus::Completed => Self::Completed,
 419            acp::ToolCallStatus::Failed => Self::Failed,
 420        }
 421    }
 422}
 423
 424impl Display for ToolCallStatus {
 425    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 426        write!(
 427            f,
 428            "{}",
 429            match self {
 430                ToolCallStatus::Pending => "Pending",
 431                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 432                ToolCallStatus::InProgress => "In Progress",
 433                ToolCallStatus::Completed => "Completed",
 434                ToolCallStatus::Failed => "Failed",
 435                ToolCallStatus::Rejected => "Rejected",
 436                ToolCallStatus::Canceled => "Canceled",
 437            }
 438        )
 439    }
 440}
 441
 442#[derive(Debug, PartialEq, Clone)]
 443pub enum ContentBlock {
 444    Empty,
 445    Markdown { markdown: Entity<Markdown> },
 446    ResourceLink { resource_link: acp::ResourceLink },
 447}
 448
 449impl ContentBlock {
 450    pub fn new(
 451        block: acp::ContentBlock,
 452        language_registry: &Arc<LanguageRegistry>,
 453        cx: &mut App,
 454    ) -> Self {
 455        let mut this = Self::Empty;
 456        this.append(block, language_registry, cx);
 457        this
 458    }
 459
 460    pub fn new_combined(
 461        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 462        language_registry: Arc<LanguageRegistry>,
 463        cx: &mut App,
 464    ) -> Self {
 465        let mut this = Self::Empty;
 466        for block in blocks {
 467            this.append(block, &language_registry, cx);
 468        }
 469        this
 470    }
 471
 472    pub fn append(
 473        &mut self,
 474        block: acp::ContentBlock,
 475        language_registry: &Arc<LanguageRegistry>,
 476        cx: &mut App,
 477    ) {
 478        if matches!(self, ContentBlock::Empty)
 479            && let acp::ContentBlock::ResourceLink(resource_link) = block
 480        {
 481            *self = ContentBlock::ResourceLink { resource_link };
 482            return;
 483        }
 484
 485        let new_content = self.block_string_contents(block);
 486
 487        match self {
 488            ContentBlock::Empty => {
 489                *self = Self::create_markdown_block(new_content, language_registry, cx);
 490            }
 491            ContentBlock::Markdown { markdown } => {
 492                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 493            }
 494            ContentBlock::ResourceLink { resource_link } => {
 495                let existing_content = Self::resource_link_md(&resource_link.uri);
 496                let combined = format!("{}\n{}", existing_content, new_content);
 497
 498                *self = Self::create_markdown_block(combined, language_registry, cx);
 499            }
 500        }
 501    }
 502
 503    fn create_markdown_block(
 504        content: String,
 505        language_registry: &Arc<LanguageRegistry>,
 506        cx: &mut App,
 507    ) -> ContentBlock {
 508        ContentBlock::Markdown {
 509            markdown: cx
 510                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 511        }
 512    }
 513
 514    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 515        match block {
 516            acp::ContentBlock::Text(text_content) => text_content.text,
 517            acp::ContentBlock::ResourceLink(resource_link) => {
 518                Self::resource_link_md(&resource_link.uri)
 519            }
 520            acp::ContentBlock::Resource(acp::EmbeddedResource {
 521                resource:
 522                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 523                        uri,
 524                        ..
 525                    }),
 526                ..
 527            }) => Self::resource_link_md(&uri),
 528            acp::ContentBlock::Image(image) => Self::image_md(&image),
 529            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 530        }
 531    }
 532
 533    fn resource_link_md(uri: &str) -> String {
 534        if let Some(uri) = MentionUri::parse(uri).log_err() {
 535            uri.as_link().to_string()
 536        } else {
 537            uri.to_string()
 538        }
 539    }
 540
 541    fn image_md(_image: &acp::ImageContent) -> String {
 542        "`Image`".into()
 543    }
 544
 545    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 546        match self {
 547            ContentBlock::Empty => "",
 548            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 549            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 550        }
 551    }
 552
 553    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 554        match self {
 555            ContentBlock::Empty => None,
 556            ContentBlock::Markdown { markdown } => Some(markdown),
 557            ContentBlock::ResourceLink { .. } => None,
 558        }
 559    }
 560
 561    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 562        match self {
 563            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 564            _ => None,
 565        }
 566    }
 567}
 568
 569#[derive(Debug)]
 570pub enum ToolCallContent {
 571    ContentBlock(ContentBlock),
 572    Diff(Entity<Diff>),
 573    Terminal(Entity<Terminal>),
 574}
 575
 576impl ToolCallContent {
 577    pub fn from_acp(
 578        content: acp::ToolCallContent,
 579        language_registry: Arc<LanguageRegistry>,
 580        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 581        cx: &mut App,
 582    ) -> Result<Self> {
 583        match content {
 584            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 585                content,
 586                &language_registry,
 587                cx,
 588            ))),
 589            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 590                Diff::finalized(
 591                    diff.path.to_string_lossy().into_owned(),
 592                    diff.old_text,
 593                    diff.new_text,
 594                    language_registry,
 595                    cx,
 596                )
 597            }))),
 598            acp::ToolCallContent::Terminal { terminal_id } => terminals
 599                .get(&terminal_id)
 600                .cloned()
 601                .map(Self::Terminal)
 602                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 603        }
 604    }
 605
 606    pub fn update_from_acp(
 607        &mut self,
 608        new: acp::ToolCallContent,
 609        language_registry: Arc<LanguageRegistry>,
 610        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 611        cx: &mut App,
 612    ) -> Result<()> {
 613        let needs_update = match (&self, &new) {
 614            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 615                old_diff.read(cx).needs_update(
 616                    new_diff.old_text.as_deref().unwrap_or(""),
 617                    &new_diff.new_text,
 618                    cx,
 619                )
 620            }
 621            _ => true,
 622        };
 623
 624        if needs_update {
 625            *self = Self::from_acp(new, language_registry, terminals, cx)?;
 626        }
 627        Ok(())
 628    }
 629
 630    pub fn to_markdown(&self, cx: &App) -> String {
 631        match self {
 632            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 633            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 634            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 635        }
 636    }
 637}
 638
 639#[derive(Debug, PartialEq)]
 640pub enum ToolCallUpdate {
 641    UpdateFields(acp::ToolCallUpdate),
 642    UpdateDiff(ToolCallUpdateDiff),
 643    UpdateTerminal(ToolCallUpdateTerminal),
 644}
 645
 646impl ToolCallUpdate {
 647    fn id(&self) -> &acp::ToolCallId {
 648        match self {
 649            Self::UpdateFields(update) => &update.id,
 650            Self::UpdateDiff(diff) => &diff.id,
 651            Self::UpdateTerminal(terminal) => &terminal.id,
 652        }
 653    }
 654}
 655
 656impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 657    fn from(update: acp::ToolCallUpdate) -> Self {
 658        Self::UpdateFields(update)
 659    }
 660}
 661
 662impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 663    fn from(diff: ToolCallUpdateDiff) -> Self {
 664        Self::UpdateDiff(diff)
 665    }
 666}
 667
 668#[derive(Debug, PartialEq)]
 669pub struct ToolCallUpdateDiff {
 670    pub id: acp::ToolCallId,
 671    pub diff: Entity<Diff>,
 672}
 673
 674impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 675    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 676        Self::UpdateTerminal(terminal)
 677    }
 678}
 679
 680#[derive(Debug, PartialEq)]
 681pub struct ToolCallUpdateTerminal {
 682    pub id: acp::ToolCallId,
 683    pub terminal: Entity<Terminal>,
 684}
 685
 686#[derive(Debug, Default)]
 687pub struct Plan {
 688    pub entries: Vec<PlanEntry>,
 689}
 690
 691#[derive(Debug)]
 692pub struct PlanStats<'a> {
 693    pub in_progress_entry: Option<&'a PlanEntry>,
 694    pub pending: u32,
 695    pub completed: u32,
 696}
 697
 698impl Plan {
 699    pub fn is_empty(&self) -> bool {
 700        self.entries.is_empty()
 701    }
 702
 703    pub fn stats(&self) -> PlanStats<'_> {
 704        let mut stats = PlanStats {
 705            in_progress_entry: None,
 706            pending: 0,
 707            completed: 0,
 708        };
 709
 710        for entry in &self.entries {
 711            match &entry.status {
 712                acp::PlanEntryStatus::Pending => {
 713                    stats.pending += 1;
 714                }
 715                acp::PlanEntryStatus::InProgress => {
 716                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 717                }
 718                acp::PlanEntryStatus::Completed => {
 719                    stats.completed += 1;
 720                }
 721            }
 722        }
 723
 724        stats
 725    }
 726}
 727
 728#[derive(Debug)]
 729pub struct PlanEntry {
 730    pub content: Entity<Markdown>,
 731    pub priority: acp::PlanEntryPriority,
 732    pub status: acp::PlanEntryStatus,
 733}
 734
 735impl PlanEntry {
 736    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 737        Self {
 738            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 739            priority: entry.priority,
 740            status: entry.status,
 741        }
 742    }
 743}
 744
 745#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 746pub struct TokenUsage {
 747    pub max_tokens: u64,
 748    pub used_tokens: u64,
 749}
 750
 751impl TokenUsage {
 752    pub fn ratio(&self) -> TokenUsageRatio {
 753        #[cfg(debug_assertions)]
 754        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 755            .unwrap_or("0.8".to_string())
 756            .parse()
 757            .unwrap();
 758        #[cfg(not(debug_assertions))]
 759        let warning_threshold: f32 = 0.8;
 760
 761        // When the maximum is unknown because there is no selected model,
 762        // avoid showing the token limit warning.
 763        if self.max_tokens == 0 {
 764            TokenUsageRatio::Normal
 765        } else if self.used_tokens >= self.max_tokens {
 766            TokenUsageRatio::Exceeded
 767        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 768            TokenUsageRatio::Warning
 769        } else {
 770            TokenUsageRatio::Normal
 771        }
 772    }
 773}
 774
 775#[derive(Debug, Clone, PartialEq, Eq)]
 776pub enum TokenUsageRatio {
 777    Normal,
 778    Warning,
 779    Exceeded,
 780}
 781
 782#[derive(Debug, Clone)]
 783pub struct RetryStatus {
 784    pub last_error: SharedString,
 785    pub attempt: usize,
 786    pub max_attempts: usize,
 787    pub started_at: Instant,
 788    pub duration: Duration,
 789}
 790
 791pub struct AcpThread {
 792    title: SharedString,
 793    entries: Vec<AgentThreadEntry>,
 794    plan: Plan,
 795    project: Entity<Project>,
 796    action_log: Entity<ActionLog>,
 797    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 798    send_task: Option<Task<()>>,
 799    connection: Rc<dyn AgentConnection>,
 800    session_id: acp::SessionId,
 801    token_usage: Option<TokenUsage>,
 802    prompt_capabilities: acp::PromptCapabilities,
 803    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 804    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 805    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 806    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 807}
 808
 809#[derive(Debug)]
 810pub enum AcpThreadEvent {
 811    NewEntry,
 812    TitleUpdated,
 813    TokenUsageUpdated,
 814    EntryUpdated(usize),
 815    EntriesRemoved(Range<usize>),
 816    ToolAuthorizationRequired,
 817    Retry(RetryStatus),
 818    Stopped,
 819    Error,
 820    LoadError(LoadError),
 821    PromptCapabilitiesUpdated,
 822    Refusal,
 823    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 824    ModeUpdated(acp::SessionModeId),
 825}
 826
 827impl EventEmitter<AcpThreadEvent> for AcpThread {}
 828
 829#[derive(Debug, Clone)]
 830pub enum TerminalProviderEvent {
 831    Created {
 832        terminal_id: acp::TerminalId,
 833        label: String,
 834        cwd: Option<PathBuf>,
 835        output_byte_limit: Option<u64>,
 836        terminal: Entity<::terminal::Terminal>,
 837    },
 838    Output {
 839        terminal_id: acp::TerminalId,
 840        data: Vec<u8>,
 841    },
 842    TitleChanged {
 843        terminal_id: acp::TerminalId,
 844        title: String,
 845    },
 846    Exit {
 847        terminal_id: acp::TerminalId,
 848        status: acp::TerminalExitStatus,
 849    },
 850}
 851
 852#[derive(Debug, Clone)]
 853pub enum TerminalProviderCommand {
 854    WriteInput {
 855        terminal_id: acp::TerminalId,
 856        bytes: Vec<u8>,
 857    },
 858    Resize {
 859        terminal_id: acp::TerminalId,
 860        cols: u16,
 861        rows: u16,
 862    },
 863    Close {
 864        terminal_id: acp::TerminalId,
 865    },
 866}
 867
 868impl AcpThread {
 869    pub fn on_terminal_provider_event(
 870        &mut self,
 871        event: TerminalProviderEvent,
 872        cx: &mut Context<Self>,
 873    ) {
 874        match event {
 875            TerminalProviderEvent::Created {
 876                terminal_id,
 877                label,
 878                cwd,
 879                output_byte_limit,
 880                terminal,
 881            } => {
 882                let entity = self.register_terminal_created(
 883                    terminal_id.clone(),
 884                    label,
 885                    cwd,
 886                    output_byte_limit,
 887                    terminal,
 888                    cx,
 889                );
 890
 891                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 892                    for data in chunks.drain(..) {
 893                        entity.update(cx, |term, cx| {
 894                            term.inner().update(cx, |inner, cx| {
 895                                inner.write_output(&data, cx);
 896                            })
 897                        });
 898                    }
 899                }
 900
 901                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 902                    entity.update(cx, |_term, cx| {
 903                        cx.notify();
 904                    });
 905                }
 906
 907                cx.notify();
 908            }
 909            TerminalProviderEvent::Output { terminal_id, data } => {
 910                if let Some(entity) = self.terminals.get(&terminal_id) {
 911                    entity.update(cx, |term, cx| {
 912                        term.inner().update(cx, |inner, cx| {
 913                            inner.write_output(&data, cx);
 914                        })
 915                    });
 916                } else {
 917                    self.pending_terminal_output
 918                        .entry(terminal_id)
 919                        .or_default()
 920                        .push(data);
 921                }
 922            }
 923            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 924                if let Some(entity) = self.terminals.get(&terminal_id) {
 925                    entity.update(cx, |term, cx| {
 926                        term.inner().update(cx, |inner, cx| {
 927                            inner.breadcrumb_text = title;
 928                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 929                        })
 930                    });
 931                }
 932            }
 933            TerminalProviderEvent::Exit {
 934                terminal_id,
 935                status,
 936            } => {
 937                if let Some(entity) = self.terminals.get(&terminal_id) {
 938                    entity.update(cx, |_term, cx| {
 939                        cx.notify();
 940                    });
 941                } else {
 942                    self.pending_terminal_exit.insert(terminal_id, status);
 943                }
 944            }
 945        }
 946    }
 947}
 948
 949#[derive(PartialEq, Eq, Debug)]
 950pub enum ThreadStatus {
 951    Idle,
 952    Generating,
 953}
 954
 955#[derive(Debug, Clone)]
 956pub enum LoadError {
 957    Unsupported {
 958        command: SharedString,
 959        current_version: SharedString,
 960        minimum_version: SharedString,
 961    },
 962    FailedToInstall(SharedString),
 963    Exited {
 964        status: ExitStatus,
 965    },
 966    Other(SharedString),
 967}
 968
 969impl Display for LoadError {
 970    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 971        match self {
 972            LoadError::Unsupported {
 973                command: path,
 974                current_version,
 975                minimum_version,
 976            } => {
 977                write!(
 978                    f,
 979                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
 980                )
 981            }
 982            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
 983            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
 984            LoadError::Other(msg) => write!(f, "{msg}"),
 985        }
 986    }
 987}
 988
 989impl Error for LoadError {}
 990
 991impl AcpThread {
 992    pub fn new(
 993        title: impl Into<SharedString>,
 994        connection: Rc<dyn AgentConnection>,
 995        project: Entity<Project>,
 996        action_log: Entity<ActionLog>,
 997        session_id: acp::SessionId,
 998        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 999        cx: &mut Context<Self>,
1000    ) -> Self {
1001        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1002        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1003            loop {
1004                let caps = prompt_capabilities_rx.recv().await?;
1005                this.update(cx, |this, cx| {
1006                    this.prompt_capabilities = caps;
1007                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1008                })?;
1009            }
1010        });
1011
1012        Self {
1013            action_log,
1014            shared_buffers: Default::default(),
1015            entries: Default::default(),
1016            plan: Default::default(),
1017            title: title.into(),
1018            project,
1019            send_task: None,
1020            connection,
1021            session_id,
1022            token_usage: None,
1023            prompt_capabilities,
1024            _observe_prompt_capabilities: task,
1025            terminals: HashMap::default(),
1026            pending_terminal_output: HashMap::default(),
1027            pending_terminal_exit: HashMap::default(),
1028        }
1029    }
1030
1031    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1032        self.prompt_capabilities.clone()
1033    }
1034
1035    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1036        &self.connection
1037    }
1038
1039    pub fn action_log(&self) -> &Entity<ActionLog> {
1040        &self.action_log
1041    }
1042
1043    pub fn project(&self) -> &Entity<Project> {
1044        &self.project
1045    }
1046
1047    pub fn title(&self) -> SharedString {
1048        self.title.clone()
1049    }
1050
1051    pub fn entries(&self) -> &[AgentThreadEntry] {
1052        &self.entries
1053    }
1054
1055    pub fn session_id(&self) -> &acp::SessionId {
1056        &self.session_id
1057    }
1058
1059    pub fn status(&self) -> ThreadStatus {
1060        if self.send_task.is_some() {
1061            ThreadStatus::Generating
1062        } else {
1063            ThreadStatus::Idle
1064        }
1065    }
1066
1067    pub fn token_usage(&self) -> Option<&TokenUsage> {
1068        self.token_usage.as_ref()
1069    }
1070
1071    pub fn has_pending_edit_tool_calls(&self) -> bool {
1072        for entry in self.entries.iter().rev() {
1073            match entry {
1074                AgentThreadEntry::UserMessage(_) => return false,
1075                AgentThreadEntry::ToolCall(
1076                    call @ ToolCall {
1077                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1078                        ..
1079                    },
1080                ) if call.diffs().next().is_some() => {
1081                    return true;
1082                }
1083                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1084            }
1085        }
1086
1087        false
1088    }
1089
1090    pub fn used_tools_since_last_user_message(&self) -> bool {
1091        for entry in self.entries.iter().rev() {
1092            match entry {
1093                AgentThreadEntry::UserMessage(..) => return false,
1094                AgentThreadEntry::AssistantMessage(..) => continue,
1095                AgentThreadEntry::ToolCall(..) => return true,
1096            }
1097        }
1098
1099        false
1100    }
1101
1102    pub fn handle_session_update(
1103        &mut self,
1104        update: acp::SessionUpdate,
1105        cx: &mut Context<Self>,
1106    ) -> Result<(), acp::Error> {
1107        match update {
1108            acp::SessionUpdate::UserMessageChunk { content } => {
1109                self.push_user_content_block(None, content, cx);
1110            }
1111            acp::SessionUpdate::AgentMessageChunk { content } => {
1112                self.push_assistant_content_block(content, false, cx);
1113            }
1114            acp::SessionUpdate::AgentThoughtChunk { content } => {
1115                self.push_assistant_content_block(content, true, cx);
1116            }
1117            acp::SessionUpdate::ToolCall(tool_call) => {
1118                self.upsert_tool_call(tool_call, cx)?;
1119            }
1120            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1121                self.update_tool_call(tool_call_update, cx)?;
1122            }
1123            acp::SessionUpdate::Plan(plan) => {
1124                self.update_plan(plan, cx);
1125            }
1126            acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => {
1127                cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands))
1128            }
1129            acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => {
1130                cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id))
1131            }
1132        }
1133        Ok(())
1134    }
1135
1136    pub fn push_user_content_block(
1137        &mut self,
1138        message_id: Option<UserMessageId>,
1139        chunk: acp::ContentBlock,
1140        cx: &mut Context<Self>,
1141    ) {
1142        let language_registry = self.project.read(cx).languages().clone();
1143        let entries_len = self.entries.len();
1144
1145        if let Some(last_entry) = self.entries.last_mut()
1146            && let AgentThreadEntry::UserMessage(UserMessage {
1147                id,
1148                content,
1149                chunks,
1150                ..
1151            }) = last_entry
1152        {
1153            *id = message_id.or(id.take());
1154            content.append(chunk.clone(), &language_registry, cx);
1155            chunks.push(chunk);
1156            let idx = entries_len - 1;
1157            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1158        } else {
1159            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
1160            self.push_entry(
1161                AgentThreadEntry::UserMessage(UserMessage {
1162                    id: message_id,
1163                    content,
1164                    chunks: vec![chunk],
1165                    checkpoint: None,
1166                }),
1167                cx,
1168            );
1169        }
1170    }
1171
1172    pub fn push_assistant_content_block(
1173        &mut self,
1174        chunk: acp::ContentBlock,
1175        is_thought: bool,
1176        cx: &mut Context<Self>,
1177    ) {
1178        let language_registry = self.project.read(cx).languages().clone();
1179        let entries_len = self.entries.len();
1180        if let Some(last_entry) = self.entries.last_mut()
1181            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1182        {
1183            let idx = entries_len - 1;
1184            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1185            match (chunks.last_mut(), is_thought) {
1186                (Some(AssistantMessageChunk::Message { block }), false)
1187                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1188                    block.append(chunk, &language_registry, cx)
1189                }
1190                _ => {
1191                    let block = ContentBlock::new(chunk, &language_registry, cx);
1192                    if is_thought {
1193                        chunks.push(AssistantMessageChunk::Thought { block })
1194                    } else {
1195                        chunks.push(AssistantMessageChunk::Message { block })
1196                    }
1197                }
1198            }
1199        } else {
1200            let block = ContentBlock::new(chunk, &language_registry, cx);
1201            let chunk = if is_thought {
1202                AssistantMessageChunk::Thought { block }
1203            } else {
1204                AssistantMessageChunk::Message { block }
1205            };
1206
1207            self.push_entry(
1208                AgentThreadEntry::AssistantMessage(AssistantMessage {
1209                    chunks: vec![chunk],
1210                }),
1211                cx,
1212            );
1213        }
1214    }
1215
1216    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1217        self.entries.push(entry);
1218        cx.emit(AcpThreadEvent::NewEntry);
1219    }
1220
1221    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1222        self.connection.set_title(&self.session_id, cx).is_some()
1223    }
1224
1225    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1226        if title != self.title {
1227            self.title = title.clone();
1228            cx.emit(AcpThreadEvent::TitleUpdated);
1229            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1230                return set_title.run(title, cx);
1231            }
1232        }
1233        Task::ready(Ok(()))
1234    }
1235
1236    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1237        self.token_usage = usage;
1238        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1239    }
1240
1241    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1242        cx.emit(AcpThreadEvent::Retry(status));
1243    }
1244
1245    pub fn update_tool_call(
1246        &mut self,
1247        update: impl Into<ToolCallUpdate>,
1248        cx: &mut Context<Self>,
1249    ) -> Result<()> {
1250        let update = update.into();
1251        let languages = self.project.read(cx).languages().clone();
1252
1253        let ix = match self.index_for_tool_call(update.id()) {
1254            Some(ix) => ix,
1255            None => {
1256                // Tool call not found - create a failed tool call entry
1257                let failed_tool_call = ToolCall {
1258                    id: update.id().clone(),
1259                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1260                    kind: acp::ToolKind::Fetch,
1261                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1262                        acp::ContentBlock::Text(acp::TextContent {
1263                            text: "Tool call not found".to_string(),
1264                            annotations: None,
1265                            meta: None,
1266                        }),
1267                        &languages,
1268                        cx,
1269                    ))],
1270                    status: ToolCallStatus::Failed,
1271                    locations: Vec::new(),
1272                    resolved_locations: Vec::new(),
1273                    raw_input: None,
1274                    raw_output: None,
1275                };
1276                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1277                return Ok(());
1278            }
1279        };
1280        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1281            unreachable!()
1282        };
1283
1284        match update {
1285            ToolCallUpdate::UpdateFields(update) => {
1286                let location_updated = update.fields.locations.is_some();
1287                call.update_fields(update.fields, languages, &self.terminals, cx)?;
1288                if location_updated {
1289                    self.resolve_locations(update.id, cx);
1290                }
1291            }
1292            ToolCallUpdate::UpdateDiff(update) => {
1293                call.content.clear();
1294                call.content.push(ToolCallContent::Diff(update.diff));
1295            }
1296            ToolCallUpdate::UpdateTerminal(update) => {
1297                call.content.clear();
1298                call.content
1299                    .push(ToolCallContent::Terminal(update.terminal));
1300            }
1301        }
1302
1303        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1304
1305        Ok(())
1306    }
1307
1308    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1309    pub fn upsert_tool_call(
1310        &mut self,
1311        tool_call: acp::ToolCall,
1312        cx: &mut Context<Self>,
1313    ) -> Result<(), acp::Error> {
1314        let status = tool_call.status.into();
1315        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1316    }
1317
1318    /// Fails if id does not match an existing entry.
1319    pub fn upsert_tool_call_inner(
1320        &mut self,
1321        update: acp::ToolCallUpdate,
1322        status: ToolCallStatus,
1323        cx: &mut Context<Self>,
1324    ) -> Result<(), acp::Error> {
1325        let language_registry = self.project.read(cx).languages().clone();
1326        let id = update.id.clone();
1327
1328        if let Some(ix) = self.index_for_tool_call(&id) {
1329            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1330                unreachable!()
1331            };
1332
1333            call.update_fields(update.fields, language_registry, &self.terminals, cx)?;
1334            call.status = status;
1335
1336            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1337        } else {
1338            let call = ToolCall::from_acp(
1339                update.try_into()?,
1340                status,
1341                language_registry,
1342                &self.terminals,
1343                cx,
1344            )?;
1345            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1346        };
1347
1348        self.resolve_locations(id, cx);
1349        Ok(())
1350    }
1351
1352    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1353        self.entries
1354            .iter()
1355            .enumerate()
1356            .rev()
1357            .find_map(|(index, entry)| {
1358                if let AgentThreadEntry::ToolCall(tool_call) = entry
1359                    && &tool_call.id == id
1360                {
1361                    Some(index)
1362                } else {
1363                    None
1364                }
1365            })
1366    }
1367
1368    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1369        // The tool call we are looking for is typically the last one, or very close to the end.
1370        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1371        self.entries
1372            .iter_mut()
1373            .enumerate()
1374            .rev()
1375            .find_map(|(index, tool_call)| {
1376                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1377                    && &tool_call.id == id
1378                {
1379                    Some((index, tool_call))
1380                } else {
1381                    None
1382                }
1383            })
1384    }
1385
1386    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1387        self.entries
1388            .iter()
1389            .enumerate()
1390            .rev()
1391            .find_map(|(index, tool_call)| {
1392                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1393                    && &tool_call.id == id
1394                {
1395                    Some((index, tool_call))
1396                } else {
1397                    None
1398                }
1399            })
1400    }
1401
1402    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1403        let project = self.project.clone();
1404        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1405            return;
1406        };
1407        let task = tool_call.resolve_locations(project, cx);
1408        cx.spawn(async move |this, cx| {
1409            let resolved_locations = task.await;
1410
1411            this.update(cx, |this, cx| {
1412                let project = this.project.clone();
1413
1414                for location in resolved_locations.iter().flatten() {
1415                    this.shared_buffers
1416                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1417                }
1418                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1419                    return;
1420                };
1421
1422                if let Some(Some(location)) = resolved_locations.last() {
1423                    project.update(cx, |project, cx| {
1424                        let should_ignore = if let Some(agent_location) = project.agent_location() {
1425                            let snapshot = location.buffer.read(cx).snapshot();
1426                            let old_position = agent_location.position.to_point(&snapshot);
1427                            let new_position = location.position.to_point(&snapshot);
1428                            agent_location.buffer == location.buffer
1429                                // ignore this so that when we get updates from the edit tool
1430                                // the position doesn't reset to the startof line
1431                                && (old_position.row == new_position.row
1432                                    && old_position.column > new_position.column)
1433                        } else {
1434                            false
1435                        };
1436                        if !should_ignore {
1437                            project.set_agent_location(Some(location.into()), cx);
1438                        }
1439                    });
1440                }
1441
1442                let resolved_locations = resolved_locations
1443                    .iter()
1444                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1445                    .collect::<Vec<_>>();
1446
1447                if tool_call.resolved_locations != resolved_locations {
1448                    tool_call.resolved_locations = resolved_locations;
1449                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1450                }
1451            })
1452        })
1453        .detach();
1454    }
1455
1456    pub fn request_tool_call_authorization(
1457        &mut self,
1458        tool_call: acp::ToolCallUpdate,
1459        options: Vec<acp::PermissionOption>,
1460        respect_always_allow_setting: bool,
1461        cx: &mut Context<Self>,
1462    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1463        let (tx, rx) = oneshot::channel();
1464
1465        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1466            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1467            // some tools would (incorrectly) continue to auto-accept.
1468            if let Some(allow_once_option) = options.iter().find_map(|option| {
1469                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1470                    Some(option.id.clone())
1471                } else {
1472                    None
1473                }
1474            }) {
1475                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1476                return Ok(async {
1477                    acp::RequestPermissionOutcome::Selected {
1478                        option_id: allow_once_option,
1479                    }
1480                }
1481                .boxed());
1482            }
1483        }
1484
1485        let status = ToolCallStatus::WaitingForConfirmation {
1486            options,
1487            respond_tx: tx,
1488        };
1489
1490        self.upsert_tool_call_inner(tool_call, status, cx)?;
1491        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1492
1493        let fut = async {
1494            match rx.await {
1495                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1496                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1497            }
1498        }
1499        .boxed();
1500
1501        Ok(fut)
1502    }
1503
1504    pub fn authorize_tool_call(
1505        &mut self,
1506        id: acp::ToolCallId,
1507        option_id: acp::PermissionOptionId,
1508        option_kind: acp::PermissionOptionKind,
1509        cx: &mut Context<Self>,
1510    ) {
1511        let Some((ix, call)) = self.tool_call_mut(&id) else {
1512            return;
1513        };
1514
1515        let new_status = match option_kind {
1516            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1517                ToolCallStatus::Rejected
1518            }
1519            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1520                ToolCallStatus::InProgress
1521            }
1522        };
1523
1524        let curr_status = mem::replace(&mut call.status, new_status);
1525
1526        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1527            respond_tx.send(option_id).log_err();
1528        } else if cfg!(debug_assertions) {
1529            panic!("tried to authorize an already authorized tool call");
1530        }
1531
1532        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1533    }
1534
1535    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1536        let mut first_tool_call = None;
1537
1538        for entry in self.entries.iter().rev() {
1539            match &entry {
1540                AgentThreadEntry::ToolCall(call) => {
1541                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1542                        first_tool_call = Some(call);
1543                    } else {
1544                        continue;
1545                    }
1546                }
1547                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1548                    // Reached the beginning of the turn.
1549                    // If we had pending permission requests in the previous turn, they have been cancelled.
1550                    break;
1551                }
1552            }
1553        }
1554
1555        first_tool_call
1556    }
1557
1558    pub fn plan(&self) -> &Plan {
1559        &self.plan
1560    }
1561
1562    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1563        let new_entries_len = request.entries.len();
1564        let mut new_entries = request.entries.into_iter();
1565
1566        // Reuse existing markdown to prevent flickering
1567        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1568            let PlanEntry {
1569                content,
1570                priority,
1571                status,
1572            } = old;
1573            content.update(cx, |old, cx| {
1574                old.replace(new.content, cx);
1575            });
1576            *priority = new.priority;
1577            *status = new.status;
1578        }
1579        for new in new_entries {
1580            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1581        }
1582        self.plan.entries.truncate(new_entries_len);
1583
1584        cx.notify();
1585    }
1586
1587    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1588        self.plan
1589            .entries
1590            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1591        cx.notify();
1592    }
1593
1594    #[cfg(any(test, feature = "test-support"))]
1595    pub fn send_raw(
1596        &mut self,
1597        message: &str,
1598        cx: &mut Context<Self>,
1599    ) -> BoxFuture<'static, Result<()>> {
1600        self.send(
1601            vec![acp::ContentBlock::Text(acp::TextContent {
1602                text: message.to_string(),
1603                annotations: None,
1604                meta: None,
1605            })],
1606            cx,
1607        )
1608    }
1609
1610    pub fn send(
1611        &mut self,
1612        message: Vec<acp::ContentBlock>,
1613        cx: &mut Context<Self>,
1614    ) -> BoxFuture<'static, Result<()>> {
1615        let block = ContentBlock::new_combined(
1616            message.clone(),
1617            self.project.read(cx).languages().clone(),
1618            cx,
1619        );
1620        let request = acp::PromptRequest {
1621            prompt: message.clone(),
1622            session_id: self.session_id.clone(),
1623            meta: None,
1624        };
1625        let git_store = self.project.read(cx).git_store().clone();
1626
1627        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1628            Some(UserMessageId::new())
1629        } else {
1630            None
1631        };
1632
1633        self.run_turn(cx, async move |this, cx| {
1634            this.update(cx, |this, cx| {
1635                this.push_entry(
1636                    AgentThreadEntry::UserMessage(UserMessage {
1637                        id: message_id.clone(),
1638                        content: block,
1639                        chunks: message,
1640                        checkpoint: None,
1641                    }),
1642                    cx,
1643                );
1644            })
1645            .ok();
1646
1647            let old_checkpoint = git_store
1648                .update(cx, |git, cx| git.checkpoint(cx))?
1649                .await
1650                .context("failed to get old checkpoint")
1651                .log_err();
1652            this.update(cx, |this, cx| {
1653                if let Some((_ix, message)) = this.last_user_message() {
1654                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1655                        git_checkpoint,
1656                        show: false,
1657                    });
1658                }
1659                this.connection.prompt(message_id, request, cx)
1660            })?
1661            .await
1662        })
1663    }
1664
1665    pub fn can_resume(&self, cx: &App) -> bool {
1666        self.connection.resume(&self.session_id, cx).is_some()
1667    }
1668
1669    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1670        self.run_turn(cx, async move |this, cx| {
1671            this.update(cx, |this, cx| {
1672                this.connection
1673                    .resume(&this.session_id, cx)
1674                    .map(|resume| resume.run(cx))
1675            })?
1676            .context("resuming a session is not supported")?
1677            .await
1678        })
1679    }
1680
1681    fn run_turn(
1682        &mut self,
1683        cx: &mut Context<Self>,
1684        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1685    ) -> BoxFuture<'static, Result<()>> {
1686        self.clear_completed_plan_entries(cx);
1687
1688        let (tx, rx) = oneshot::channel();
1689        let cancel_task = self.cancel(cx);
1690
1691        self.send_task = Some(cx.spawn(async move |this, cx| {
1692            cancel_task.await;
1693            tx.send(f(this, cx).await).ok();
1694        }));
1695
1696        cx.spawn(async move |this, cx| {
1697            let response = rx.await;
1698
1699            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1700                .await?;
1701
1702            this.update(cx, |this, cx| {
1703                this.project
1704                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1705                match response {
1706                    Ok(Err(e)) => {
1707                        this.send_task.take();
1708                        cx.emit(AcpThreadEvent::Error);
1709                        Err(e)
1710                    }
1711                    result => {
1712                        let canceled = matches!(
1713                            result,
1714                            Ok(Ok(acp::PromptResponse {
1715                                stop_reason: acp::StopReason::Cancelled,
1716                                meta: None,
1717                            }))
1718                        );
1719
1720                        // We only take the task if the current prompt wasn't canceled.
1721                        //
1722                        // This prompt may have been canceled because another one was sent
1723                        // while it was still generating. In these cases, dropping `send_task`
1724                        // would cause the next generation to be canceled.
1725                        if !canceled {
1726                            this.send_task.take();
1727                        }
1728
1729                        // Handle refusal - distinguish between user prompt and tool call refusals
1730                        if let Ok(Ok(acp::PromptResponse {
1731                            stop_reason: acp::StopReason::Refusal,
1732                            meta: _,
1733                        })) = result
1734                        {
1735                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1736                                // Check if there's a completed tool call with results after the last user message
1737                                // This indicates the refusal is in response to tool output, not the user's prompt
1738                                let has_completed_tool_call_after_user_msg =
1739                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1740                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1741                                            // Check if the tool call has completed and has output
1742                                            matches!(tool_call.status, ToolCallStatus::Completed)
1743                                                && tool_call.raw_output.is_some()
1744                                        } else {
1745                                            false
1746                                        }
1747                                    });
1748
1749                                if has_completed_tool_call_after_user_msg {
1750                                    // Refusal is due to tool output - don't truncate, just notify
1751                                    // The model refused based on what the tool returned
1752                                    cx.emit(AcpThreadEvent::Refusal);
1753                                } else {
1754                                    // User prompt was refused - truncate back to before the user message
1755                                    let range = user_msg_ix..this.entries.len();
1756                                    if range.start < range.end {
1757                                        this.entries.truncate(user_msg_ix);
1758                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1759                                    }
1760                                    cx.emit(AcpThreadEvent::Refusal);
1761                                }
1762                            } else {
1763                                // No user message found, treat as general refusal
1764                                cx.emit(AcpThreadEvent::Refusal);
1765                            }
1766                        }
1767
1768                        cx.emit(AcpThreadEvent::Stopped);
1769                        Ok(())
1770                    }
1771                }
1772            })?
1773        })
1774        .boxed()
1775    }
1776
1777    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1778        let Some(send_task) = self.send_task.take() else {
1779            return Task::ready(());
1780        };
1781
1782        for entry in self.entries.iter_mut() {
1783            if let AgentThreadEntry::ToolCall(call) = entry {
1784                let cancel = matches!(
1785                    call.status,
1786                    ToolCallStatus::Pending
1787                        | ToolCallStatus::WaitingForConfirmation { .. }
1788                        | ToolCallStatus::InProgress
1789                );
1790
1791                if cancel {
1792                    call.status = ToolCallStatus::Canceled;
1793                }
1794            }
1795        }
1796
1797        self.connection.cancel(&self.session_id, cx);
1798
1799        // Wait for the send task to complete
1800        cx.foreground_executor().spawn(send_task)
1801    }
1802
1803    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1804    pub fn restore_checkpoint(
1805        &mut self,
1806        id: UserMessageId,
1807        cx: &mut Context<Self>,
1808    ) -> Task<Result<()>> {
1809        let Some((_, message)) = self.user_message_mut(&id) else {
1810            return Task::ready(Err(anyhow!("message not found")));
1811        };
1812
1813        let checkpoint = message
1814            .checkpoint
1815            .as_ref()
1816            .map(|c| c.git_checkpoint.clone());
1817        let rewind = self.rewind(id.clone(), cx);
1818        let git_store = self.project.read(cx).git_store().clone();
1819
1820        cx.spawn(async move |_, cx| {
1821            rewind.await?;
1822            if let Some(checkpoint) = checkpoint {
1823                git_store
1824                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1825                    .await?;
1826            }
1827
1828            Ok(())
1829        })
1830    }
1831
1832    /// Rewinds this thread to before the entry at `index`, removing it and all
1833    /// subsequent entries while rejecting any action_log changes made from that point.
1834    /// Unlike `restore_checkpoint`, this method does not restore from git.
1835    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1836        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1837            return Task::ready(Err(anyhow!("not supported")));
1838        };
1839
1840        cx.spawn(async move |this, cx| {
1841            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1842            this.update(cx, |this, cx| {
1843                if let Some((ix, _)) = this.user_message_mut(&id) {
1844                    let range = ix..this.entries.len();
1845                    this.entries.truncate(ix);
1846                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1847                }
1848                this.action_log()
1849                    .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1850            })?
1851            .await;
1852            Ok(())
1853        })
1854    }
1855
1856    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1857        let git_store = self.project.read(cx).git_store().clone();
1858
1859        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1860            if let Some(checkpoint) = message.checkpoint.as_ref() {
1861                checkpoint.git_checkpoint.clone()
1862            } else {
1863                return Task::ready(Ok(()));
1864            }
1865        } else {
1866            return Task::ready(Ok(()));
1867        };
1868
1869        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1870        cx.spawn(async move |this, cx| {
1871            let new_checkpoint = new_checkpoint
1872                .await
1873                .context("failed to get new checkpoint")
1874                .log_err();
1875            if let Some(new_checkpoint) = new_checkpoint {
1876                let equal = git_store
1877                    .update(cx, |git, cx| {
1878                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1879                    })?
1880                    .await
1881                    .unwrap_or(true);
1882                this.update(cx, |this, cx| {
1883                    let (ix, message) = this.last_user_message().context("no user message")?;
1884                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1885                    checkpoint.show = !equal;
1886                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1887                    anyhow::Ok(())
1888                })??;
1889            }
1890
1891            Ok(())
1892        })
1893    }
1894
1895    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1896        self.entries
1897            .iter_mut()
1898            .enumerate()
1899            .rev()
1900            .find_map(|(ix, entry)| {
1901                if let AgentThreadEntry::UserMessage(message) = entry {
1902                    Some((ix, message))
1903                } else {
1904                    None
1905                }
1906            })
1907    }
1908
1909    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1910        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1911            if let AgentThreadEntry::UserMessage(message) = entry {
1912                if message.id.as_ref() == Some(id) {
1913                    Some((ix, message))
1914                } else {
1915                    None
1916                }
1917            } else {
1918                None
1919            }
1920        })
1921    }
1922
1923    pub fn read_text_file(
1924        &self,
1925        path: PathBuf,
1926        line: Option<u32>,
1927        limit: Option<u32>,
1928        reuse_shared_snapshot: bool,
1929        cx: &mut Context<Self>,
1930    ) -> Task<Result<String, acp::Error>> {
1931        // Args are 1-based, move to 0-based
1932        let line = line.unwrap_or_default().saturating_sub(1);
1933        let limit = limit.unwrap_or(u32::MAX);
1934        let project = self.project.clone();
1935        let action_log = self.action_log.clone();
1936        cx.spawn(async move |this, cx| {
1937            let load = project
1938                .update(cx, |project, cx| {
1939                    let path = project
1940                        .project_path_for_absolute_path(&path, cx)
1941                        .ok_or_else(|| {
1942                            acp::Error::resource_not_found(Some(path.display().to_string()))
1943                        })?;
1944                    Ok(project.open_buffer(path, cx))
1945                })
1946                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1947                .flatten()?;
1948
1949            let buffer = load.await?;
1950
1951            let snapshot = if reuse_shared_snapshot {
1952                this.read_with(cx, |this, _| {
1953                    this.shared_buffers.get(&buffer.clone()).cloned()
1954                })
1955                .log_err()
1956                .flatten()
1957            } else {
1958                None
1959            };
1960
1961            let snapshot = if let Some(snapshot) = snapshot {
1962                snapshot
1963            } else {
1964                action_log.update(cx, |action_log, cx| {
1965                    action_log.buffer_read(buffer.clone(), cx);
1966                })?;
1967
1968                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
1969                this.update(cx, |this, _| {
1970                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
1971                })?;
1972                snapshot
1973            };
1974
1975            let max_point = snapshot.max_point();
1976            let start_position = Point::new(line, 0);
1977
1978            if start_position > max_point {
1979                return Err(acp::Error::invalid_params().with_data(format!(
1980                    "Attempting to read beyond the end of the file, line {}:{}",
1981                    max_point.row + 1,
1982                    max_point.column
1983                )));
1984            }
1985
1986            let start = snapshot.anchor_before(start_position);
1987            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
1988
1989            project.update(cx, |project, cx| {
1990                project.set_agent_location(
1991                    Some(AgentLocation {
1992                        buffer: buffer.downgrade(),
1993                        position: start,
1994                    }),
1995                    cx,
1996                );
1997            })?;
1998
1999            Ok(snapshot.text_for_range(start..end).collect::<String>())
2000        })
2001    }
2002
2003    pub fn write_text_file(
2004        &self,
2005        path: PathBuf,
2006        content: String,
2007        cx: &mut Context<Self>,
2008    ) -> Task<Result<()>> {
2009        let project = self.project.clone();
2010        let action_log = self.action_log.clone();
2011        cx.spawn(async move |this, cx| {
2012            let load = project.update(cx, |project, cx| {
2013                let path = project
2014                    .project_path_for_absolute_path(&path, cx)
2015                    .context("invalid path")?;
2016                anyhow::Ok(project.open_buffer(path, cx))
2017            });
2018            let buffer = load??.await?;
2019            let snapshot = this.update(cx, |this, cx| {
2020                this.shared_buffers
2021                    .get(&buffer)
2022                    .cloned()
2023                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2024            })?;
2025            let edits = cx
2026                .background_executor()
2027                .spawn(async move {
2028                    let old_text = snapshot.text();
2029                    text_diff(old_text.as_str(), &content)
2030                        .into_iter()
2031                        .map(|(range, replacement)| {
2032                            (
2033                                snapshot.anchor_after(range.start)
2034                                    ..snapshot.anchor_before(range.end),
2035                                replacement,
2036                            )
2037                        })
2038                        .collect::<Vec<_>>()
2039                })
2040                .await;
2041
2042            project.update(cx, |project, cx| {
2043                project.set_agent_location(
2044                    Some(AgentLocation {
2045                        buffer: buffer.downgrade(),
2046                        position: edits
2047                            .last()
2048                            .map(|(range, _)| range.end)
2049                            .unwrap_or(Anchor::MIN),
2050                    }),
2051                    cx,
2052                );
2053            })?;
2054
2055            let format_on_save = cx.update(|cx| {
2056                action_log.update(cx, |action_log, cx| {
2057                    action_log.buffer_read(buffer.clone(), cx);
2058                });
2059
2060                let format_on_save = buffer.update(cx, |buffer, cx| {
2061                    buffer.edit(edits, None, cx);
2062
2063                    let settings = language::language_settings::language_settings(
2064                        buffer.language().map(|l| l.name()),
2065                        buffer.file(),
2066                        cx,
2067                    );
2068
2069                    settings.format_on_save != FormatOnSave::Off
2070                });
2071                action_log.update(cx, |action_log, cx| {
2072                    action_log.buffer_edited(buffer.clone(), cx);
2073                });
2074                format_on_save
2075            })?;
2076
2077            if format_on_save {
2078                let format_task = project.update(cx, |project, cx| {
2079                    project.format(
2080                        HashSet::from_iter([buffer.clone()]),
2081                        LspFormatTarget::Buffers,
2082                        false,
2083                        FormatTrigger::Save,
2084                        cx,
2085                    )
2086                })?;
2087                format_task.await.log_err();
2088
2089                action_log.update(cx, |action_log, cx| {
2090                    action_log.buffer_edited(buffer.clone(), cx);
2091                })?;
2092            }
2093
2094            project
2095                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2096                .await
2097        })
2098    }
2099
2100    pub fn create_terminal(
2101        &self,
2102        command: String,
2103        args: Vec<String>,
2104        extra_env: Vec<acp::EnvVariable>,
2105        cwd: Option<PathBuf>,
2106        output_byte_limit: Option<u64>,
2107        cx: &mut Context<Self>,
2108    ) -> Task<Result<Entity<Terminal>>> {
2109        let env = match &cwd {
2110            Some(dir) => self.project.update(cx, |project, cx| {
2111                let worktree = project.find_worktree(dir.as_path(), cx);
2112                let shell = TerminalSettings::get(
2113                    worktree.as_ref().map(|(worktree, path)| SettingsLocation {
2114                        worktree_id: worktree.read(cx).id(),
2115                        path: &path,
2116                    }),
2117                    cx,
2118                )
2119                .shell
2120                .clone();
2121                project.directory_environment(&shell, dir.as_path().into(), cx)
2122            }),
2123            None => Task::ready(None).shared(),
2124        };
2125        let env = cx.spawn(async move |_, _| {
2126            let mut env = env.await.unwrap_or_default();
2127            // Disables paging for `git` and hopefully other commands
2128            env.insert("PAGER".into(), "".into());
2129            for var in extra_env {
2130                env.insert(var.name, var.value);
2131            }
2132            env
2133        });
2134
2135        let project = self.project.clone();
2136        let language_registry = project.read(cx).languages().clone();
2137        let is_windows = project.read(cx).path_style(cx).is_windows();
2138
2139        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2140        let terminal_task = cx.spawn({
2141            let terminal_id = terminal_id.clone();
2142            async move |_this, cx| {
2143                let env = env.await;
2144                let shell = project
2145                    .update(cx, |project, cx| {
2146                        project
2147                            .remote_client()
2148                            .and_then(|r| r.read(cx).default_system_shell())
2149                    })?
2150                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2151                let (task_command, task_args) =
2152                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2153                        .redirect_stdin_to_dev_null()
2154                        .build(Some(command.clone()), &args);
2155                let terminal = project
2156                    .update(cx, |project, cx| {
2157                        project.create_terminal_task(
2158                            task::SpawnInTerminal {
2159                                command: Some(task_command),
2160                                args: task_args,
2161                                cwd: cwd.clone(),
2162                                env,
2163                                ..Default::default()
2164                            },
2165                            cx,
2166                        )
2167                    })?
2168                    .await?;
2169
2170                cx.new(|cx| {
2171                    Terminal::new(
2172                        terminal_id,
2173                        &format!("{} {}", command, args.join(" ")),
2174                        cwd,
2175                        output_byte_limit.map(|l| l as usize),
2176                        terminal,
2177                        language_registry,
2178                        cx,
2179                    )
2180                })
2181            }
2182        });
2183
2184        cx.spawn(async move |this, cx| {
2185            let terminal = terminal_task.await?;
2186            this.update(cx, |this, _cx| {
2187                this.terminals.insert(terminal_id, terminal.clone());
2188                terminal
2189            })
2190        })
2191    }
2192
2193    pub fn kill_terminal(
2194        &mut self,
2195        terminal_id: acp::TerminalId,
2196        cx: &mut Context<Self>,
2197    ) -> Result<()> {
2198        self.terminals
2199            .get(&terminal_id)
2200            .context("Terminal not found")?
2201            .update(cx, |terminal, cx| {
2202                terminal.kill(cx);
2203            });
2204
2205        Ok(())
2206    }
2207
2208    pub fn release_terminal(
2209        &mut self,
2210        terminal_id: acp::TerminalId,
2211        cx: &mut Context<Self>,
2212    ) -> Result<()> {
2213        self.terminals
2214            .remove(&terminal_id)
2215            .context("Terminal not found")?
2216            .update(cx, |terminal, cx| {
2217                terminal.kill(cx);
2218            });
2219
2220        Ok(())
2221    }
2222
2223    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2224        self.terminals
2225            .get(&terminal_id)
2226            .context("Terminal not found")
2227            .cloned()
2228    }
2229
2230    pub fn to_markdown(&self, cx: &App) -> String {
2231        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2232    }
2233
2234    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2235        cx.emit(AcpThreadEvent::LoadError(error));
2236    }
2237
2238    pub fn register_terminal_created(
2239        &mut self,
2240        terminal_id: acp::TerminalId,
2241        command_label: String,
2242        working_dir: Option<PathBuf>,
2243        output_byte_limit: Option<u64>,
2244        terminal: Entity<::terminal::Terminal>,
2245        cx: &mut Context<Self>,
2246    ) -> Entity<Terminal> {
2247        let language_registry = self.project.read(cx).languages().clone();
2248
2249        let entity = cx.new(|cx| {
2250            Terminal::new(
2251                terminal_id.clone(),
2252                &command_label,
2253                working_dir.clone(),
2254                output_byte_limit.map(|l| l as usize),
2255                terminal,
2256                language_registry,
2257                cx,
2258            )
2259        });
2260        self.terminals.insert(terminal_id.clone(), entity.clone());
2261        entity
2262    }
2263}
2264
2265fn markdown_for_raw_output(
2266    raw_output: &serde_json::Value,
2267    language_registry: &Arc<LanguageRegistry>,
2268    cx: &mut App,
2269) -> Option<Entity<Markdown>> {
2270    match raw_output {
2271        serde_json::Value::Null => None,
2272        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2273            Markdown::new(
2274                value.to_string().into(),
2275                Some(language_registry.clone()),
2276                None,
2277                cx,
2278            )
2279        })),
2280        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2281            Markdown::new(
2282                value.to_string().into(),
2283                Some(language_registry.clone()),
2284                None,
2285                cx,
2286            )
2287        })),
2288        serde_json::Value::String(value) => Some(cx.new(|cx| {
2289            Markdown::new(
2290                value.clone().into(),
2291                Some(language_registry.clone()),
2292                None,
2293                cx,
2294            )
2295        })),
2296        value => Some(cx.new(|cx| {
2297            Markdown::new(
2298                format!("```json\n{}\n```", value).into(),
2299                Some(language_registry.clone()),
2300                None,
2301                cx,
2302            )
2303        })),
2304    }
2305}
2306
2307#[cfg(test)]
2308mod tests {
2309    use super::*;
2310    use anyhow::anyhow;
2311    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2312    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2313    use indoc::indoc;
2314    use project::{FakeFs, Fs};
2315    use rand::{distr, prelude::*};
2316    use serde_json::json;
2317    use settings::SettingsStore;
2318    use smol::stream::StreamExt as _;
2319    use std::{
2320        any::Any,
2321        cell::RefCell,
2322        path::Path,
2323        rc::Rc,
2324        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2325        time::Duration,
2326    };
2327    use util::path;
2328
2329    fn init_test(cx: &mut TestAppContext) {
2330        env_logger::try_init().ok();
2331        cx.update(|cx| {
2332            let settings_store = SettingsStore::test(cx);
2333            cx.set_global(settings_store);
2334            Project::init_settings(cx);
2335            language::init(cx);
2336        });
2337    }
2338
2339    #[gpui::test]
2340    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2341        init_test(cx);
2342
2343        let fs = FakeFs::new(cx.executor());
2344        let project = Project::test(fs, [], cx).await;
2345        let connection = Rc::new(FakeAgentConnection::new());
2346        let thread = cx
2347            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2348            .await
2349            .unwrap();
2350
2351        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2352
2353        // Send Output BEFORE Created - should be buffered by acp_thread
2354        thread.update(cx, |thread, cx| {
2355            thread.on_terminal_provider_event(
2356                TerminalProviderEvent::Output {
2357                    terminal_id: terminal_id.clone(),
2358                    data: b"hello buffered".to_vec(),
2359                },
2360                cx,
2361            );
2362        });
2363
2364        // Create a display-only terminal and then send Created
2365        let lower = cx.new(|cx| {
2366            let builder = ::terminal::TerminalBuilder::new_display_only(
2367                ::terminal::terminal_settings::CursorShape::default(),
2368                ::terminal::terminal_settings::AlternateScroll::On,
2369                None,
2370                0,
2371            )
2372            .unwrap();
2373            builder.subscribe(cx)
2374        });
2375
2376        thread.update(cx, |thread, cx| {
2377            thread.on_terminal_provider_event(
2378                TerminalProviderEvent::Created {
2379                    terminal_id: terminal_id.clone(),
2380                    label: "Buffered Test".to_string(),
2381                    cwd: None,
2382                    output_byte_limit: None,
2383                    terminal: lower.clone(),
2384                },
2385                cx,
2386            );
2387        });
2388
2389        // After Created, buffered Output should have been flushed into the renderer
2390        let content = thread.read_with(cx, |thread, cx| {
2391            let term = thread.terminal(terminal_id.clone()).unwrap();
2392            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2393        });
2394
2395        assert!(
2396            content.contains("hello buffered"),
2397            "expected buffered output to render, got: {content}"
2398        );
2399    }
2400
2401    #[gpui::test]
2402    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2403        init_test(cx);
2404
2405        let fs = FakeFs::new(cx.executor());
2406        let project = Project::test(fs, [], cx).await;
2407        let connection = Rc::new(FakeAgentConnection::new());
2408        let thread = cx
2409            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2410            .await
2411            .unwrap();
2412
2413        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2414
2415        // Send Output BEFORE Created
2416        thread.update(cx, |thread, cx| {
2417            thread.on_terminal_provider_event(
2418                TerminalProviderEvent::Output {
2419                    terminal_id: terminal_id.clone(),
2420                    data: b"pre-exit data".to_vec(),
2421                },
2422                cx,
2423            );
2424        });
2425
2426        // Send Exit BEFORE Created
2427        thread.update(cx, |thread, cx| {
2428            thread.on_terminal_provider_event(
2429                TerminalProviderEvent::Exit {
2430                    terminal_id: terminal_id.clone(),
2431                    status: acp::TerminalExitStatus {
2432                        exit_code: Some(0),
2433                        signal: None,
2434                        meta: None,
2435                    },
2436                },
2437                cx,
2438            );
2439        });
2440
2441        // Now create a display-only lower-level terminal and send Created
2442        let lower = cx.new(|cx| {
2443            let builder = ::terminal::TerminalBuilder::new_display_only(
2444                ::terminal::terminal_settings::CursorShape::default(),
2445                ::terminal::terminal_settings::AlternateScroll::On,
2446                None,
2447                0,
2448            )
2449            .unwrap();
2450            builder.subscribe(cx)
2451        });
2452
2453        thread.update(cx, |thread, cx| {
2454            thread.on_terminal_provider_event(
2455                TerminalProviderEvent::Created {
2456                    terminal_id: terminal_id.clone(),
2457                    label: "Buffered Exit Test".to_string(),
2458                    cwd: None,
2459                    output_byte_limit: None,
2460                    terminal: lower.clone(),
2461                },
2462                cx,
2463            );
2464        });
2465
2466        // Output should be present after Created (flushed from buffer)
2467        let content = thread.read_with(cx, |thread, cx| {
2468            let term = thread.terminal(terminal_id.clone()).unwrap();
2469            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2470        });
2471
2472        assert!(
2473            content.contains("pre-exit data"),
2474            "expected pre-exit data to render, got: {content}"
2475        );
2476    }
2477
2478    #[gpui::test]
2479    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2480        init_test(cx);
2481
2482        let fs = FakeFs::new(cx.executor());
2483        let project = Project::test(fs, [], cx).await;
2484        let connection = Rc::new(FakeAgentConnection::new());
2485        let thread = cx
2486            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2487            .await
2488            .unwrap();
2489
2490        // Test creating a new user message
2491        thread.update(cx, |thread, cx| {
2492            thread.push_user_content_block(
2493                None,
2494                acp::ContentBlock::Text(acp::TextContent {
2495                    annotations: None,
2496                    text: "Hello, ".to_string(),
2497                    meta: None,
2498                }),
2499                cx,
2500            );
2501        });
2502
2503        thread.update(cx, |thread, cx| {
2504            assert_eq!(thread.entries.len(), 1);
2505            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2506                assert_eq!(user_msg.id, None);
2507                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2508            } else {
2509                panic!("Expected UserMessage");
2510            }
2511        });
2512
2513        // Test appending to existing user message
2514        let message_1_id = UserMessageId::new();
2515        thread.update(cx, |thread, cx| {
2516            thread.push_user_content_block(
2517                Some(message_1_id.clone()),
2518                acp::ContentBlock::Text(acp::TextContent {
2519                    annotations: None,
2520                    text: "world!".to_string(),
2521                    meta: None,
2522                }),
2523                cx,
2524            );
2525        });
2526
2527        thread.update(cx, |thread, cx| {
2528            assert_eq!(thread.entries.len(), 1);
2529            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2530                assert_eq!(user_msg.id, Some(message_1_id));
2531                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2532            } else {
2533                panic!("Expected UserMessage");
2534            }
2535        });
2536
2537        // Test creating new user message after assistant message
2538        thread.update(cx, |thread, cx| {
2539            thread.push_assistant_content_block(
2540                acp::ContentBlock::Text(acp::TextContent {
2541                    annotations: None,
2542                    text: "Assistant response".to_string(),
2543                    meta: None,
2544                }),
2545                false,
2546                cx,
2547            );
2548        });
2549
2550        let message_2_id = UserMessageId::new();
2551        thread.update(cx, |thread, cx| {
2552            thread.push_user_content_block(
2553                Some(message_2_id.clone()),
2554                acp::ContentBlock::Text(acp::TextContent {
2555                    annotations: None,
2556                    text: "New user message".to_string(),
2557                    meta: None,
2558                }),
2559                cx,
2560            );
2561        });
2562
2563        thread.update(cx, |thread, cx| {
2564            assert_eq!(thread.entries.len(), 3);
2565            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2566                assert_eq!(user_msg.id, Some(message_2_id));
2567                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2568            } else {
2569                panic!("Expected UserMessage at index 2");
2570            }
2571        });
2572    }
2573
2574    #[gpui::test]
2575    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2576        init_test(cx);
2577
2578        let fs = FakeFs::new(cx.executor());
2579        let project = Project::test(fs, [], cx).await;
2580        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2581            |_, thread, mut cx| {
2582                async move {
2583                    thread.update(&mut cx, |thread, cx| {
2584                        thread
2585                            .handle_session_update(
2586                                acp::SessionUpdate::AgentThoughtChunk {
2587                                    content: "Thinking ".into(),
2588                                },
2589                                cx,
2590                            )
2591                            .unwrap();
2592                        thread
2593                            .handle_session_update(
2594                                acp::SessionUpdate::AgentThoughtChunk {
2595                                    content: "hard!".into(),
2596                                },
2597                                cx,
2598                            )
2599                            .unwrap();
2600                    })?;
2601                    Ok(acp::PromptResponse {
2602                        stop_reason: acp::StopReason::EndTurn,
2603                        meta: None,
2604                    })
2605                }
2606                .boxed_local()
2607            },
2608        ));
2609
2610        let thread = cx
2611            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2612            .await
2613            .unwrap();
2614
2615        thread
2616            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2617            .await
2618            .unwrap();
2619
2620        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2621        assert_eq!(
2622            output,
2623            indoc! {r#"
2624            ## User
2625
2626            Hello from Zed!
2627
2628            ## Assistant
2629
2630            <thinking>
2631            Thinking hard!
2632            </thinking>
2633
2634            "#}
2635        );
2636    }
2637
2638    #[gpui::test]
2639    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2640        init_test(cx);
2641
2642        let fs = FakeFs::new(cx.executor());
2643        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2644            .await;
2645        let project = Project::test(fs.clone(), [], cx).await;
2646        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2647        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2648        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2649            move |_, thread, mut cx| {
2650                let read_file_tx = read_file_tx.clone();
2651                async move {
2652                    let content = thread
2653                        .update(&mut cx, |thread, cx| {
2654                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2655                        })
2656                        .unwrap()
2657                        .await
2658                        .unwrap();
2659                    assert_eq!(content, "one\ntwo\nthree\n");
2660                    read_file_tx.take().unwrap().send(()).unwrap();
2661                    thread
2662                        .update(&mut cx, |thread, cx| {
2663                            thread.write_text_file(
2664                                path!("/tmp/foo").into(),
2665                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2666                                cx,
2667                            )
2668                        })
2669                        .unwrap()
2670                        .await
2671                        .unwrap();
2672                    Ok(acp::PromptResponse {
2673                        stop_reason: acp::StopReason::EndTurn,
2674                        meta: None,
2675                    })
2676                }
2677                .boxed_local()
2678            },
2679        ));
2680
2681        let (worktree, pathbuf) = project
2682            .update(cx, |project, cx| {
2683                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2684            })
2685            .await
2686            .unwrap();
2687        let buffer = project
2688            .update(cx, |project, cx| {
2689                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2690            })
2691            .await
2692            .unwrap();
2693
2694        let thread = cx
2695            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2696            .await
2697            .unwrap();
2698
2699        let request = thread.update(cx, |thread, cx| {
2700            thread.send_raw("Extend the count in /tmp/foo", cx)
2701        });
2702        read_file_rx.await.ok();
2703        buffer.update(cx, |buffer, cx| {
2704            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2705        });
2706        cx.run_until_parked();
2707        assert_eq!(
2708            buffer.read_with(cx, |buffer, _| buffer.text()),
2709            "zero\none\ntwo\nthree\nfour\nfive\n"
2710        );
2711        assert_eq!(
2712            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2713            "zero\none\ntwo\nthree\nfour\nfive\n"
2714        );
2715        request.await.unwrap();
2716    }
2717
2718    #[gpui::test]
2719    async fn test_reading_from_line(cx: &mut TestAppContext) {
2720        init_test(cx);
2721
2722        let fs = FakeFs::new(cx.executor());
2723        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2724            .await;
2725        let project = Project::test(fs.clone(), [], cx).await;
2726        project
2727            .update(cx, |project, cx| {
2728                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2729            })
2730            .await
2731            .unwrap();
2732
2733        let connection = Rc::new(FakeAgentConnection::new());
2734
2735        let thread = cx
2736            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2737            .await
2738            .unwrap();
2739
2740        // Whole file
2741        let content = thread
2742            .update(cx, |thread, cx| {
2743                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2744            })
2745            .await
2746            .unwrap();
2747
2748        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2749
2750        // Only start line
2751        let content = thread
2752            .update(cx, |thread, cx| {
2753                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2754            })
2755            .await
2756            .unwrap();
2757
2758        assert_eq!(content, "three\nfour\n");
2759
2760        // Only limit
2761        let content = thread
2762            .update(cx, |thread, cx| {
2763                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2764            })
2765            .await
2766            .unwrap();
2767
2768        assert_eq!(content, "one\ntwo\n");
2769
2770        // Range
2771        let content = thread
2772            .update(cx, |thread, cx| {
2773                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2774            })
2775            .await
2776            .unwrap();
2777
2778        assert_eq!(content, "two\nthree\n");
2779
2780        // Invalid
2781        let err = thread
2782            .update(cx, |thread, cx| {
2783                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2784            })
2785            .await
2786            .unwrap_err();
2787
2788        assert_eq!(
2789            err.to_string(),
2790            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2791        );
2792    }
2793
2794    #[gpui::test]
2795    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2796        init_test(cx);
2797
2798        let fs = FakeFs::new(cx.executor());
2799        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2800        let project = Project::test(fs.clone(), [], cx).await;
2801        project
2802            .update(cx, |project, cx| {
2803                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2804            })
2805            .await
2806            .unwrap();
2807
2808        let connection = Rc::new(FakeAgentConnection::new());
2809
2810        let thread = cx
2811            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2812            .await
2813            .unwrap();
2814
2815        // Whole file
2816        let content = thread
2817            .update(cx, |thread, cx| {
2818                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2819            })
2820            .await
2821            .unwrap();
2822
2823        assert_eq!(content, "");
2824
2825        // Only start line
2826        let content = thread
2827            .update(cx, |thread, cx| {
2828                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2829            })
2830            .await
2831            .unwrap();
2832
2833        assert_eq!(content, "");
2834
2835        // Only limit
2836        let content = thread
2837            .update(cx, |thread, cx| {
2838                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2839            })
2840            .await
2841            .unwrap();
2842
2843        assert_eq!(content, "");
2844
2845        // Range
2846        let content = thread
2847            .update(cx, |thread, cx| {
2848                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2849            })
2850            .await
2851            .unwrap();
2852
2853        assert_eq!(content, "");
2854
2855        // Invalid
2856        let err = thread
2857            .update(cx, |thread, cx| {
2858                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2859            })
2860            .await
2861            .unwrap_err();
2862
2863        assert_eq!(
2864            err.to_string(),
2865            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2866        );
2867    }
2868    #[gpui::test]
2869    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2870        init_test(cx);
2871
2872        let fs = FakeFs::new(cx.executor());
2873        fs.insert_tree(path!("/tmp"), json!({})).await;
2874        let project = Project::test(fs.clone(), [], cx).await;
2875        project
2876            .update(cx, |project, cx| {
2877                project.find_or_create_worktree(path!("/tmp"), true, cx)
2878            })
2879            .await
2880            .unwrap();
2881
2882        let connection = Rc::new(FakeAgentConnection::new());
2883
2884        let thread = cx
2885            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2886            .await
2887            .unwrap();
2888
2889        // Out of project file
2890        let err = thread
2891            .update(cx, |thread, cx| {
2892                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2893            })
2894            .await
2895            .unwrap_err();
2896
2897        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2898    }
2899
2900    #[gpui::test]
2901    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2902        init_test(cx);
2903
2904        let fs = FakeFs::new(cx.executor());
2905        let project = Project::test(fs, [], cx).await;
2906        let id = acp::ToolCallId("test".into());
2907
2908        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2909            let id = id.clone();
2910            move |_, thread, mut cx| {
2911                let id = id.clone();
2912                async move {
2913                    thread
2914                        .update(&mut cx, |thread, cx| {
2915                            thread.handle_session_update(
2916                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2917                                    id: id.clone(),
2918                                    title: "Label".into(),
2919                                    kind: acp::ToolKind::Fetch,
2920                                    status: acp::ToolCallStatus::InProgress,
2921                                    content: vec![],
2922                                    locations: vec![],
2923                                    raw_input: None,
2924                                    raw_output: None,
2925                                    meta: None,
2926                                }),
2927                                cx,
2928                            )
2929                        })
2930                        .unwrap()
2931                        .unwrap();
2932                    Ok(acp::PromptResponse {
2933                        stop_reason: acp::StopReason::EndTurn,
2934                        meta: None,
2935                    })
2936                }
2937                .boxed_local()
2938            }
2939        }));
2940
2941        let thread = cx
2942            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2943            .await
2944            .unwrap();
2945
2946        let request = thread.update(cx, |thread, cx| {
2947            thread.send_raw("Fetch https://example.com", cx)
2948        });
2949
2950        run_until_first_tool_call(&thread, cx).await;
2951
2952        thread.read_with(cx, |thread, _| {
2953            assert!(matches!(
2954                thread.entries[1],
2955                AgentThreadEntry::ToolCall(ToolCall {
2956                    status: ToolCallStatus::InProgress,
2957                    ..
2958                })
2959            ));
2960        });
2961
2962        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2963
2964        thread.read_with(cx, |thread, _| {
2965            assert!(matches!(
2966                &thread.entries[1],
2967                AgentThreadEntry::ToolCall(ToolCall {
2968                    status: ToolCallStatus::Canceled,
2969                    ..
2970                })
2971            ));
2972        });
2973
2974        thread
2975            .update(cx, |thread, cx| {
2976                thread.handle_session_update(
2977                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2978                        id,
2979                        fields: acp::ToolCallUpdateFields {
2980                            status: Some(acp::ToolCallStatus::Completed),
2981                            ..Default::default()
2982                        },
2983                        meta: None,
2984                    }),
2985                    cx,
2986                )
2987            })
2988            .unwrap();
2989
2990        request.await.unwrap();
2991
2992        thread.read_with(cx, |thread, _| {
2993            assert!(matches!(
2994                thread.entries[1],
2995                AgentThreadEntry::ToolCall(ToolCall {
2996                    status: ToolCallStatus::Completed,
2997                    ..
2998                })
2999            ));
3000        });
3001    }
3002
3003    #[gpui::test]
3004    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3005        init_test(cx);
3006        let fs = FakeFs::new(cx.background_executor.clone());
3007        fs.insert_tree(path!("/test"), json!({})).await;
3008        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3009
3010        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3011            move |_, thread, mut cx| {
3012                async move {
3013                    thread
3014                        .update(&mut cx, |thread, cx| {
3015                            thread.handle_session_update(
3016                                acp::SessionUpdate::ToolCall(acp::ToolCall {
3017                                    id: acp::ToolCallId("test".into()),
3018                                    title: "Label".into(),
3019                                    kind: acp::ToolKind::Edit,
3020                                    status: acp::ToolCallStatus::Completed,
3021                                    content: vec![acp::ToolCallContent::Diff {
3022                                        diff: acp::Diff {
3023                                            path: "/test/test.txt".into(),
3024                                            old_text: None,
3025                                            new_text: "foo".into(),
3026                                            meta: None,
3027                                        },
3028                                    }],
3029                                    locations: vec![],
3030                                    raw_input: None,
3031                                    raw_output: None,
3032                                    meta: None,
3033                                }),
3034                                cx,
3035                            )
3036                        })
3037                        .unwrap()
3038                        .unwrap();
3039                    Ok(acp::PromptResponse {
3040                        stop_reason: acp::StopReason::EndTurn,
3041                        meta: None,
3042                    })
3043                }
3044                .boxed_local()
3045            }
3046        }));
3047
3048        let thread = cx
3049            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3050            .await
3051            .unwrap();
3052
3053        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3054            .await
3055            .unwrap();
3056
3057        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3058    }
3059
3060    #[gpui::test(iterations = 10)]
3061    async fn test_checkpoints(cx: &mut TestAppContext) {
3062        init_test(cx);
3063        let fs = FakeFs::new(cx.background_executor.clone());
3064        fs.insert_tree(
3065            path!("/test"),
3066            json!({
3067                ".git": {}
3068            }),
3069        )
3070        .await;
3071        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3072
3073        let simulate_changes = Arc::new(AtomicBool::new(true));
3074        let next_filename = Arc::new(AtomicUsize::new(0));
3075        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3076            let simulate_changes = simulate_changes.clone();
3077            let next_filename = next_filename.clone();
3078            let fs = fs.clone();
3079            move |request, thread, mut cx| {
3080                let fs = fs.clone();
3081                let simulate_changes = simulate_changes.clone();
3082                let next_filename = next_filename.clone();
3083                async move {
3084                    if simulate_changes.load(SeqCst) {
3085                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3086                        fs.write(Path::new(&filename), b"").await?;
3087                    }
3088
3089                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3090                        panic!("expected text content block");
3091                    };
3092                    thread.update(&mut cx, |thread, cx| {
3093                        thread
3094                            .handle_session_update(
3095                                acp::SessionUpdate::AgentMessageChunk {
3096                                    content: content.text.to_uppercase().into(),
3097                                },
3098                                cx,
3099                            )
3100                            .unwrap();
3101                    })?;
3102                    Ok(acp::PromptResponse {
3103                        stop_reason: acp::StopReason::EndTurn,
3104                        meta: None,
3105                    })
3106                }
3107                .boxed_local()
3108            }
3109        }));
3110        let thread = cx
3111            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3112            .await
3113            .unwrap();
3114
3115        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3116            .await
3117            .unwrap();
3118        thread.read_with(cx, |thread, cx| {
3119            assert_eq!(
3120                thread.to_markdown(cx),
3121                indoc! {"
3122                    ## User (checkpoint)
3123
3124                    Lorem
3125
3126                    ## Assistant
3127
3128                    LOREM
3129
3130                "}
3131            );
3132        });
3133        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3134
3135        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3136            .await
3137            .unwrap();
3138        thread.read_with(cx, |thread, cx| {
3139            assert_eq!(
3140                thread.to_markdown(cx),
3141                indoc! {"
3142                    ## User (checkpoint)
3143
3144                    Lorem
3145
3146                    ## Assistant
3147
3148                    LOREM
3149
3150                    ## User (checkpoint)
3151
3152                    ipsum
3153
3154                    ## Assistant
3155
3156                    IPSUM
3157
3158                "}
3159            );
3160        });
3161        assert_eq!(
3162            fs.files(),
3163            vec![
3164                Path::new(path!("/test/file-0")),
3165                Path::new(path!("/test/file-1"))
3166            ]
3167        );
3168
3169        // Checkpoint isn't stored when there are no changes.
3170        simulate_changes.store(false, SeqCst);
3171        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3172            .await
3173            .unwrap();
3174        thread.read_with(cx, |thread, cx| {
3175            assert_eq!(
3176                thread.to_markdown(cx),
3177                indoc! {"
3178                    ## User (checkpoint)
3179
3180                    Lorem
3181
3182                    ## Assistant
3183
3184                    LOREM
3185
3186                    ## User (checkpoint)
3187
3188                    ipsum
3189
3190                    ## Assistant
3191
3192                    IPSUM
3193
3194                    ## User
3195
3196                    dolor
3197
3198                    ## Assistant
3199
3200                    DOLOR
3201
3202                "}
3203            );
3204        });
3205        assert_eq!(
3206            fs.files(),
3207            vec![
3208                Path::new(path!("/test/file-0")),
3209                Path::new(path!("/test/file-1"))
3210            ]
3211        );
3212
3213        // Rewinding the conversation truncates the history and restores the checkpoint.
3214        thread
3215            .update(cx, |thread, cx| {
3216                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3217                    panic!("unexpected entries {:?}", thread.entries)
3218                };
3219                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3220            })
3221            .await
3222            .unwrap();
3223        thread.read_with(cx, |thread, cx| {
3224            assert_eq!(
3225                thread.to_markdown(cx),
3226                indoc! {"
3227                    ## User (checkpoint)
3228
3229                    Lorem
3230
3231                    ## Assistant
3232
3233                    LOREM
3234
3235                "}
3236            );
3237        });
3238        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3239    }
3240
3241    #[gpui::test]
3242    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3243        use std::sync::atomic::AtomicUsize;
3244        init_test(cx);
3245
3246        let fs = FakeFs::new(cx.executor());
3247        let project = Project::test(fs, None, cx).await;
3248
3249        // Create a connection that simulates refusal after tool result
3250        let prompt_count = Arc::new(AtomicUsize::new(0));
3251        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3252            let prompt_count = prompt_count.clone();
3253            move |_request, thread, mut cx| {
3254                let count = prompt_count.fetch_add(1, SeqCst);
3255                async move {
3256                    if count == 0 {
3257                        // First prompt: Generate a tool call with result
3258                        thread.update(&mut cx, |thread, cx| {
3259                            thread
3260                                .handle_session_update(
3261                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3262                                        id: acp::ToolCallId("tool1".into()),
3263                                        title: "Test Tool".into(),
3264                                        kind: acp::ToolKind::Fetch,
3265                                        status: acp::ToolCallStatus::Completed,
3266                                        content: vec![],
3267                                        locations: vec![],
3268                                        raw_input: Some(serde_json::json!({"query": "test"})),
3269                                        raw_output: Some(
3270                                            serde_json::json!({"result": "inappropriate content"}),
3271                                        ),
3272                                        meta: None,
3273                                    }),
3274                                    cx,
3275                                )
3276                                .unwrap();
3277                        })?;
3278
3279                        // Now return refusal because of the tool result
3280                        Ok(acp::PromptResponse {
3281                            stop_reason: acp::StopReason::Refusal,
3282                            meta: None,
3283                        })
3284                    } else {
3285                        Ok(acp::PromptResponse {
3286                            stop_reason: acp::StopReason::EndTurn,
3287                            meta: None,
3288                        })
3289                    }
3290                }
3291                .boxed_local()
3292            }
3293        }));
3294
3295        let thread = cx
3296            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3297            .await
3298            .unwrap();
3299
3300        // Track if we see a Refusal event
3301        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3302        let saw_refusal_event_captured = saw_refusal_event.clone();
3303        thread.update(cx, |_thread, cx| {
3304            cx.subscribe(
3305                &thread,
3306                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3307                    if matches!(event, AcpThreadEvent::Refusal) {
3308                        *saw_refusal_event_captured.lock().unwrap() = true;
3309                    }
3310                },
3311            )
3312            .detach();
3313        });
3314
3315        // Send a user message - this will trigger tool call and then refusal
3316        let send_task = thread.update(cx, |thread, cx| {
3317            thread.send(
3318                vec![acp::ContentBlock::Text(acp::TextContent {
3319                    text: "Hello".into(),
3320                    annotations: None,
3321                    meta: None,
3322                })],
3323                cx,
3324            )
3325        });
3326        cx.background_executor.spawn(send_task).detach();
3327        cx.run_until_parked();
3328
3329        // Verify that:
3330        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3331        // 2. The user message was NOT truncated
3332        assert!(
3333            *saw_refusal_event.lock().unwrap(),
3334            "Refusal event should be emitted for tool result refusals"
3335        );
3336
3337        thread.read_with(cx, |thread, _| {
3338            let entries = thread.entries();
3339            assert!(entries.len() >= 2, "Should have user message and tool call");
3340
3341            // Verify user message is still there
3342            assert!(
3343                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3344                "User message should not be truncated"
3345            );
3346
3347            // Verify tool call is there with result
3348            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3349                assert!(
3350                    tool_call.raw_output.is_some(),
3351                    "Tool call should have output"
3352                );
3353            } else {
3354                panic!("Expected tool call at index 1");
3355            }
3356        });
3357    }
3358
3359    #[gpui::test]
3360    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3361        init_test(cx);
3362
3363        let fs = FakeFs::new(cx.executor());
3364        let project = Project::test(fs, None, cx).await;
3365
3366        let refuse_next = Arc::new(AtomicBool::new(false));
3367        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3368            let refuse_next = refuse_next.clone();
3369            move |_request, _thread, _cx| {
3370                if refuse_next.load(SeqCst) {
3371                    async move {
3372                        Ok(acp::PromptResponse {
3373                            stop_reason: acp::StopReason::Refusal,
3374                            meta: None,
3375                        })
3376                    }
3377                    .boxed_local()
3378                } else {
3379                    async move {
3380                        Ok(acp::PromptResponse {
3381                            stop_reason: acp::StopReason::EndTurn,
3382                            meta: None,
3383                        })
3384                    }
3385                    .boxed_local()
3386                }
3387            }
3388        }));
3389
3390        let thread = cx
3391            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3392            .await
3393            .unwrap();
3394
3395        // Track if we see a Refusal event
3396        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3397        let saw_refusal_event_captured = saw_refusal_event.clone();
3398        thread.update(cx, |_thread, cx| {
3399            cx.subscribe(
3400                &thread,
3401                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3402                    if matches!(event, AcpThreadEvent::Refusal) {
3403                        *saw_refusal_event_captured.lock().unwrap() = true;
3404                    }
3405                },
3406            )
3407            .detach();
3408        });
3409
3410        // Send a message that will be refused
3411        refuse_next.store(true, SeqCst);
3412        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3413            .await
3414            .unwrap();
3415
3416        // Verify that a Refusal event WAS emitted for user prompt refusal
3417        assert!(
3418            *saw_refusal_event.lock().unwrap(),
3419            "Refusal event should be emitted for user prompt refusals"
3420        );
3421
3422        // Verify the message was truncated (user prompt refusal)
3423        thread.read_with(cx, |thread, cx| {
3424            assert_eq!(thread.to_markdown(cx), "");
3425        });
3426    }
3427
3428    #[gpui::test]
3429    async fn test_refusal(cx: &mut TestAppContext) {
3430        init_test(cx);
3431        let fs = FakeFs::new(cx.background_executor.clone());
3432        fs.insert_tree(path!("/"), json!({})).await;
3433        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3434
3435        let refuse_next = Arc::new(AtomicBool::new(false));
3436        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3437            let refuse_next = refuse_next.clone();
3438            move |request, thread, mut cx| {
3439                let refuse_next = refuse_next.clone();
3440                async move {
3441                    if refuse_next.load(SeqCst) {
3442                        return Ok(acp::PromptResponse {
3443                            stop_reason: acp::StopReason::Refusal,
3444                            meta: None,
3445                        });
3446                    }
3447
3448                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3449                        panic!("expected text content block");
3450                    };
3451                    thread.update(&mut cx, |thread, cx| {
3452                        thread
3453                            .handle_session_update(
3454                                acp::SessionUpdate::AgentMessageChunk {
3455                                    content: content.text.to_uppercase().into(),
3456                                },
3457                                cx,
3458                            )
3459                            .unwrap();
3460                    })?;
3461                    Ok(acp::PromptResponse {
3462                        stop_reason: acp::StopReason::EndTurn,
3463                        meta: None,
3464                    })
3465                }
3466                .boxed_local()
3467            }
3468        }));
3469        let thread = cx
3470            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3471            .await
3472            .unwrap();
3473
3474        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3475            .await
3476            .unwrap();
3477        thread.read_with(cx, |thread, cx| {
3478            assert_eq!(
3479                thread.to_markdown(cx),
3480                indoc! {"
3481                    ## User
3482
3483                    hello
3484
3485                    ## Assistant
3486
3487                    HELLO
3488
3489                "}
3490            );
3491        });
3492
3493        // Simulate refusing the second message. The message should be truncated
3494        // when a user prompt is refused.
3495        refuse_next.store(true, SeqCst);
3496        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3497            .await
3498            .unwrap();
3499        thread.read_with(cx, |thread, cx| {
3500            assert_eq!(
3501                thread.to_markdown(cx),
3502                indoc! {"
3503                    ## User
3504
3505                    hello
3506
3507                    ## Assistant
3508
3509                    HELLO
3510
3511                "}
3512            );
3513        });
3514    }
3515
3516    async fn run_until_first_tool_call(
3517        thread: &Entity<AcpThread>,
3518        cx: &mut TestAppContext,
3519    ) -> usize {
3520        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3521
3522        let subscription = cx.update(|cx| {
3523            cx.subscribe(thread, move |thread, _, cx| {
3524                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3525                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3526                        return tx.try_send(ix).unwrap();
3527                    }
3528                }
3529            })
3530        });
3531
3532        select! {
3533            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3534                panic!("Timeout waiting for tool call")
3535            }
3536            ix = rx.next().fuse() => {
3537                drop(subscription);
3538                ix.unwrap()
3539            }
3540        }
3541    }
3542
3543    #[derive(Clone, Default)]
3544    struct FakeAgentConnection {
3545        auth_methods: Vec<acp::AuthMethod>,
3546        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3547        on_user_message: Option<
3548            Rc<
3549                dyn Fn(
3550                        acp::PromptRequest,
3551                        WeakEntity<AcpThread>,
3552                        AsyncApp,
3553                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3554                    + 'static,
3555            >,
3556        >,
3557    }
3558
3559    impl FakeAgentConnection {
3560        fn new() -> Self {
3561            Self {
3562                auth_methods: Vec::new(),
3563                on_user_message: None,
3564                sessions: Arc::default(),
3565            }
3566        }
3567
3568        #[expect(unused)]
3569        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3570            self.auth_methods = auth_methods;
3571            self
3572        }
3573
3574        fn on_user_message(
3575            mut self,
3576            handler: impl Fn(
3577                acp::PromptRequest,
3578                WeakEntity<AcpThread>,
3579                AsyncApp,
3580            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3581            + 'static,
3582        ) -> Self {
3583            self.on_user_message.replace(Rc::new(handler));
3584            self
3585        }
3586    }
3587
3588    impl AgentConnection for FakeAgentConnection {
3589        fn auth_methods(&self) -> &[acp::AuthMethod] {
3590            &self.auth_methods
3591        }
3592
3593        fn new_thread(
3594            self: Rc<Self>,
3595            project: Entity<Project>,
3596            _cwd: &Path,
3597            cx: &mut App,
3598        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3599            let session_id = acp::SessionId(
3600                rand::rng()
3601                    .sample_iter(&distr::Alphanumeric)
3602                    .take(7)
3603                    .map(char::from)
3604                    .collect::<String>()
3605                    .into(),
3606            );
3607            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3608            let thread = cx.new(|cx| {
3609                AcpThread::new(
3610                    "Test",
3611                    self.clone(),
3612                    project,
3613                    action_log,
3614                    session_id.clone(),
3615                    watch::Receiver::constant(acp::PromptCapabilities {
3616                        image: true,
3617                        audio: true,
3618                        embedded_context: true,
3619                        meta: None,
3620                    }),
3621                    cx,
3622                )
3623            });
3624            self.sessions.lock().insert(session_id, thread.downgrade());
3625            Task::ready(Ok(thread))
3626        }
3627
3628        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3629            if self.auth_methods().iter().any(|m| m.id == method) {
3630                Task::ready(Ok(()))
3631            } else {
3632                Task::ready(Err(anyhow!("Invalid Auth Method")))
3633            }
3634        }
3635
3636        fn prompt(
3637            &self,
3638            _id: Option<UserMessageId>,
3639            params: acp::PromptRequest,
3640            cx: &mut App,
3641        ) -> Task<gpui::Result<acp::PromptResponse>> {
3642            let sessions = self.sessions.lock();
3643            let thread = sessions.get(&params.session_id).unwrap();
3644            if let Some(handler) = &self.on_user_message {
3645                let handler = handler.clone();
3646                let thread = thread.clone();
3647                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3648            } else {
3649                Task::ready(Ok(acp::PromptResponse {
3650                    stop_reason: acp::StopReason::EndTurn,
3651                    meta: None,
3652                }))
3653            }
3654        }
3655
3656        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3657            let sessions = self.sessions.lock();
3658            let thread = sessions.get(session_id).unwrap().clone();
3659
3660            cx.spawn(async move |cx| {
3661                thread
3662                    .update(cx, |thread, cx| thread.cancel(cx))
3663                    .unwrap()
3664                    .await
3665            })
3666            .detach();
3667        }
3668
3669        fn truncate(
3670            &self,
3671            session_id: &acp::SessionId,
3672            _cx: &App,
3673        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3674            Some(Rc::new(FakeAgentSessionEditor {
3675                _session_id: session_id.clone(),
3676            }))
3677        }
3678
3679        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3680            self
3681        }
3682    }
3683
3684    struct FakeAgentSessionEditor {
3685        _session_id: acp::SessionId,
3686    }
3687
3688    impl AgentSessionTruncate for FakeAgentSessionEditor {
3689        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3690            Task::ready(Ok(()))
3691        }
3692    }
3693
3694    #[gpui::test]
3695    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3696        init_test(cx);
3697
3698        let fs = FakeFs::new(cx.executor());
3699        let project = Project::test(fs, [], cx).await;
3700        let connection = Rc::new(FakeAgentConnection::new());
3701        let thread = cx
3702            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3703            .await
3704            .unwrap();
3705
3706        // Try to update a tool call that doesn't exist
3707        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3708        thread.update(cx, |thread, cx| {
3709            let result = thread.handle_session_update(
3710                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3711                    id: nonexistent_id.clone(),
3712                    fields: acp::ToolCallUpdateFields {
3713                        status: Some(acp::ToolCallStatus::Completed),
3714                        ..Default::default()
3715                    },
3716                    meta: None,
3717                }),
3718                cx,
3719            );
3720
3721            // The update should succeed (not return an error)
3722            assert!(result.is_ok());
3723
3724            // There should now be exactly one entry in the thread
3725            assert_eq!(thread.entries.len(), 1);
3726
3727            // The entry should be a failed tool call
3728            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3729                assert_eq!(tool_call.id, nonexistent_id);
3730                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3731                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3732
3733                // Check that the content contains the error message
3734                assert_eq!(tool_call.content.len(), 1);
3735                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3736                    match content_block {
3737                        ContentBlock::Markdown { markdown } => {
3738                            let markdown_text = markdown.read(cx).source();
3739                            assert!(markdown_text.contains("Tool call not found"));
3740                        }
3741                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3742                        ContentBlock::ResourceLink { .. } => {
3743                            panic!("Expected markdown content, got resource link")
3744                        }
3745                    }
3746                } else {
3747                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3748                }
3749            } else {
3750                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3751            }
3752        });
3753    }
3754}