1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use ::terminal::terminal_settings::TerminalSettings;
   7use agent_settings::AgentSettings;
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use language::language_settings::FormatOnSave;
  12pub use mention::*;
  13use project::lsp_store::{FormatTrigger, LspFormatTarget};
  14use serde::{Deserialize, Serialize};
  15use settings::{Settings as _, SettingsLocation};
  16use task::{Shell, ShellBuilder};
  17pub use terminal::*;
  18
  19use action_log::ActionLog;
  20use agent_client_protocol::{self as acp};
  21use anyhow::{Context as _, Result, anyhow};
  22use editor::Bias;
  23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  25use itertools::Itertools;
  26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  27use markdown::Markdown;
  28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  29use std::collections::HashMap;
  30use std::error::Error;
  31use std::fmt::{Formatter, Write};
  32use std::ops::Range;
  33use std::process::ExitStatus;
  34use std::rc::Rc;
  35use std::time::{Duration, Instant};
  36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  37use ui::App;
  38use util::{ResultExt, get_default_system_shell_preferring_bash};
  39use uuid::Uuid;
  40
  41#[derive(Debug)]
  42pub struct UserMessage {
  43    pub id: Option<UserMessageId>,
  44    pub content: ContentBlock,
  45    pub chunks: Vec<acp::ContentBlock>,
  46    pub checkpoint: Option<Checkpoint>,
  47}
  48
  49#[derive(Debug)]
  50pub struct Checkpoint {
  51    git_checkpoint: GitStoreCheckpoint,
  52    pub show: bool,
  53}
  54
  55impl UserMessage {
  56    fn to_markdown(&self, cx: &App) -> String {
  57        let mut markdown = String::new();
  58        if self
  59            .checkpoint
  60            .as_ref()
  61            .is_some_and(|checkpoint| checkpoint.show)
  62        {
  63            writeln!(markdown, "## User (checkpoint)").unwrap();
  64        } else {
  65            writeln!(markdown, "## User").unwrap();
  66        }
  67        writeln!(markdown).unwrap();
  68        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  69        writeln!(markdown).unwrap();
  70        markdown
  71    }
  72}
  73
  74#[derive(Debug, PartialEq)]
  75pub struct AssistantMessage {
  76    pub chunks: Vec<AssistantMessageChunk>,
  77}
  78
  79impl AssistantMessage {
  80    pub fn to_markdown(&self, cx: &App) -> String {
  81        format!(
  82            "## Assistant\n\n{}\n\n",
  83            self.chunks
  84                .iter()
  85                .map(|chunk| chunk.to_markdown(cx))
  86                .join("\n\n")
  87        )
  88    }
  89}
  90
  91#[derive(Debug, PartialEq)]
  92pub enum AssistantMessageChunk {
  93    Message { block: ContentBlock },
  94    Thought { block: ContentBlock },
  95}
  96
  97impl AssistantMessageChunk {
  98    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  99        Self::Message {
 100            block: ContentBlock::new(chunk.into(), language_registry, cx),
 101        }
 102    }
 103
 104    fn to_markdown(&self, cx: &App) -> String {
 105        match self {
 106            Self::Message { block } => block.to_markdown(cx).to_string(),
 107            Self::Thought { block } => {
 108                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 109            }
 110        }
 111    }
 112}
 113
 114#[derive(Debug)]
 115pub enum AgentThreadEntry {
 116    UserMessage(UserMessage),
 117    AssistantMessage(AssistantMessage),
 118    ToolCall(ToolCall),
 119}
 120
 121impl AgentThreadEntry {
 122    pub fn to_markdown(&self, cx: &App) -> String {
 123        match self {
 124            Self::UserMessage(message) => message.to_markdown(cx),
 125            Self::AssistantMessage(message) => message.to_markdown(cx),
 126            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 127        }
 128    }
 129
 130    pub const fn user_message(&self) -> Option<&UserMessage> {
 131        if let AgentThreadEntry::UserMessage(message) = self {
 132            Some(message)
 133        } else {
 134            None
 135        }
 136    }
 137
 138    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 139        if let AgentThreadEntry::ToolCall(call) = self {
 140            itertools::Either::Left(call.diffs())
 141        } else {
 142            itertools::Either::Right(std::iter::empty())
 143        }
 144    }
 145
 146    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 147        if let AgentThreadEntry::ToolCall(call) = self {
 148            itertools::Either::Left(call.terminals())
 149        } else {
 150            itertools::Either::Right(std::iter::empty())
 151        }
 152    }
 153
 154    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 155        if let AgentThreadEntry::ToolCall(ToolCall {
 156            locations,
 157            resolved_locations,
 158            ..
 159        }) = self
 160        {
 161            Some((
 162                locations.get(ix)?.clone(),
 163                resolved_locations.get(ix)?.clone()?,
 164            ))
 165        } else {
 166            None
 167        }
 168    }
 169}
 170
 171#[derive(Debug)]
 172pub struct ToolCall {
 173    pub id: acp::ToolCallId,
 174    pub label: Entity<Markdown>,
 175    pub kind: acp::ToolKind,
 176    pub content: Vec<ToolCallContent>,
 177    pub status: ToolCallStatus,
 178    pub locations: Vec<acp::ToolCallLocation>,
 179    pub resolved_locations: Vec<Option<AgentLocation>>,
 180    pub raw_input: Option<serde_json::Value>,
 181    pub raw_output: Option<serde_json::Value>,
 182}
 183
 184impl ToolCall {
 185    fn from_acp(
 186        tool_call: acp::ToolCall,
 187        status: ToolCallStatus,
 188        language_registry: Arc<LanguageRegistry>,
 189        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 190        cx: &mut App,
 191    ) -> Result<Self> {
 192        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 193            first_line.to_owned() + "…"
 194        } else {
 195            tool_call.title
 196        };
 197        let mut content = Vec::with_capacity(tool_call.content.len());
 198        for item in tool_call.content {
 199            content.push(ToolCallContent::from_acp(
 200                item,
 201                language_registry.clone(),
 202                terminals,
 203                cx,
 204            )?);
 205        }
 206
 207        let result = Self {
 208            id: tool_call.id,
 209            label: cx
 210                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 211            kind: tool_call.kind,
 212            content,
 213            locations: tool_call.locations,
 214            resolved_locations: Vec::default(),
 215            status,
 216            raw_input: tool_call.raw_input,
 217            raw_output: tool_call.raw_output,
 218        };
 219        Ok(result)
 220    }
 221
 222    fn update_fields(
 223        &mut self,
 224        fields: acp::ToolCallUpdateFields,
 225        language_registry: Arc<LanguageRegistry>,
 226        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 227        cx: &mut App,
 228    ) -> Result<()> {
 229        let acp::ToolCallUpdateFields {
 230            kind,
 231            status,
 232            title,
 233            content,
 234            locations,
 235            raw_input,
 236            raw_output,
 237        } = fields;
 238
 239        if let Some(kind) = kind {
 240            self.kind = kind;
 241        }
 242
 243        if let Some(status) = status {
 244            self.status = status.into();
 245        }
 246
 247        if let Some(title) = title {
 248            self.label.update(cx, |label, cx| {
 249                if let Some((first_line, _)) = title.split_once("\n") {
 250                    label.replace(first_line.to_owned() + "…", cx)
 251                } else {
 252                    label.replace(title, cx);
 253                }
 254            });
 255        }
 256
 257        if let Some(content) = content {
 258            let new_content_len = content.len();
 259            let mut content = content.into_iter();
 260
 261            // Reuse existing content if we can
 262            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 263                old.update_from_acp(new, language_registry.clone(), terminals, cx)?;
 264            }
 265            for new in content {
 266                self.content.push(ToolCallContent::from_acp(
 267                    new,
 268                    language_registry.clone(),
 269                    terminals,
 270                    cx,
 271                )?)
 272            }
 273            self.content.truncate(new_content_len);
 274        }
 275
 276        if let Some(locations) = locations {
 277            self.locations = locations;
 278        }
 279
 280        if let Some(raw_input) = raw_input {
 281            self.raw_input = Some(raw_input);
 282        }
 283
 284        if let Some(raw_output) = raw_output {
 285            if self.content.is_empty()
 286                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 287            {
 288                self.content
 289                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 290                        markdown,
 291                    }));
 292            }
 293            self.raw_output = Some(raw_output);
 294        }
 295        Ok(())
 296    }
 297
 298    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 299        self.content.iter().filter_map(|content| match content {
 300            ToolCallContent::Diff(diff) => Some(diff),
 301            ToolCallContent::ContentBlock(_) => None,
 302            ToolCallContent::Terminal(_) => None,
 303        })
 304    }
 305
 306    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 307        self.content.iter().filter_map(|content| match content {
 308            ToolCallContent::Terminal(terminal) => Some(terminal),
 309            ToolCallContent::ContentBlock(_) => None,
 310            ToolCallContent::Diff(_) => None,
 311        })
 312    }
 313
 314    fn to_markdown(&self, cx: &App) -> String {
 315        let mut markdown = format!(
 316            "**Tool Call: {}**\nStatus: {}\n\n",
 317            self.label.read(cx).source(),
 318            self.status
 319        );
 320        for content in &self.content {
 321            markdown.push_str(content.to_markdown(cx).as_str());
 322            markdown.push_str("\n\n");
 323        }
 324        markdown
 325    }
 326
 327    async fn resolve_location(
 328        location: acp::ToolCallLocation,
 329        project: WeakEntity<Project>,
 330        cx: &mut AsyncApp,
 331    ) -> Option<AgentLocation> {
 332        let buffer = project
 333            .update(cx, |project, cx| {
 334                project
 335                    .project_path_for_absolute_path(&location.path, cx)
 336                    .map(|path| project.open_buffer(path, cx))
 337            })
 338            .ok()??;
 339        let buffer = buffer.await.log_err()?;
 340        let position = buffer
 341            .update(cx, |buffer, _| {
 342                if let Some(row) = location.line {
 343                    let snapshot = buffer.snapshot();
 344                    let column = snapshot.indent_size_for_line(row).len;
 345                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 346                    snapshot.anchor_before(point)
 347                } else {
 348                    Anchor::MIN
 349                }
 350            })
 351            .ok()?;
 352
 353        Some(AgentLocation {
 354            buffer: buffer.downgrade(),
 355            position,
 356        })
 357    }
 358
 359    fn resolve_locations(
 360        &self,
 361        project: Entity<Project>,
 362        cx: &mut App,
 363    ) -> Task<Vec<Option<AgentLocation>>> {
 364        let locations = self.locations.clone();
 365        project.update(cx, |_, cx| {
 366            cx.spawn(async move |project, cx| {
 367                let mut new_locations = Vec::new();
 368                for location in locations {
 369                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 370                }
 371                new_locations
 372            })
 373        })
 374    }
 375}
 376
 377#[derive(Debug)]
 378pub enum ToolCallStatus {
 379    /// The tool call hasn't started running yet, but we start showing it to
 380    /// the user.
 381    Pending,
 382    /// The tool call is waiting for confirmation from the user.
 383    WaitingForConfirmation {
 384        options: Vec<acp::PermissionOption>,
 385        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 386    },
 387    /// The tool call is currently running.
 388    InProgress,
 389    /// The tool call completed successfully.
 390    Completed,
 391    /// The tool call failed.
 392    Failed,
 393    /// The user rejected the tool call.
 394    Rejected,
 395    /// The user canceled generation so the tool call was canceled.
 396    Canceled,
 397}
 398
 399impl From<acp::ToolCallStatus> for ToolCallStatus {
 400    fn from(status: acp::ToolCallStatus) -> Self {
 401        match status {
 402            acp::ToolCallStatus::Pending => Self::Pending,
 403            acp::ToolCallStatus::InProgress => Self::InProgress,
 404            acp::ToolCallStatus::Completed => Self::Completed,
 405            acp::ToolCallStatus::Failed => Self::Failed,
 406        }
 407    }
 408}
 409
 410impl Display for ToolCallStatus {
 411    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 412        write!(
 413            f,
 414            "{}",
 415            match self {
 416                ToolCallStatus::Pending => "Pending",
 417                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 418                ToolCallStatus::InProgress => "In Progress",
 419                ToolCallStatus::Completed => "Completed",
 420                ToolCallStatus::Failed => "Failed",
 421                ToolCallStatus::Rejected => "Rejected",
 422                ToolCallStatus::Canceled => "Canceled",
 423            }
 424        )
 425    }
 426}
 427
 428#[derive(Debug, PartialEq, Clone)]
 429pub enum ContentBlock {
 430    Empty,
 431    Markdown { markdown: Entity<Markdown> },
 432    ResourceLink { resource_link: acp::ResourceLink },
 433}
 434
 435impl ContentBlock {
 436    pub fn new(
 437        block: acp::ContentBlock,
 438        language_registry: &Arc<LanguageRegistry>,
 439        cx: &mut App,
 440    ) -> Self {
 441        let mut this = Self::Empty;
 442        this.append(block, language_registry, cx);
 443        this
 444    }
 445
 446    pub fn new_combined(
 447        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 448        language_registry: Arc<LanguageRegistry>,
 449        cx: &mut App,
 450    ) -> Self {
 451        let mut this = Self::Empty;
 452        for block in blocks {
 453            this.append(block, &language_registry, cx);
 454        }
 455        this
 456    }
 457
 458    pub fn append(
 459        &mut self,
 460        block: acp::ContentBlock,
 461        language_registry: &Arc<LanguageRegistry>,
 462        cx: &mut App,
 463    ) {
 464        if matches!(self, ContentBlock::Empty)
 465            && let acp::ContentBlock::ResourceLink(resource_link) = block
 466        {
 467            *self = ContentBlock::ResourceLink { resource_link };
 468            return;
 469        }
 470
 471        let new_content = self.block_string_contents(block);
 472
 473        match self {
 474            ContentBlock::Empty => {
 475                *self = Self::create_markdown_block(new_content, language_registry, cx);
 476            }
 477            ContentBlock::Markdown { markdown } => {
 478                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 479            }
 480            ContentBlock::ResourceLink { resource_link } => {
 481                let existing_content = Self::resource_link_md(&resource_link.uri);
 482                let combined = format!("{}\n{}", existing_content, new_content);
 483
 484                *self = Self::create_markdown_block(combined, language_registry, cx);
 485            }
 486        }
 487    }
 488
 489    fn create_markdown_block(
 490        content: String,
 491        language_registry: &Arc<LanguageRegistry>,
 492        cx: &mut App,
 493    ) -> ContentBlock {
 494        ContentBlock::Markdown {
 495            markdown: cx
 496                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 497        }
 498    }
 499
 500    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 501        match block {
 502            acp::ContentBlock::Text(text_content) => text_content.text,
 503            acp::ContentBlock::ResourceLink(resource_link) => {
 504                Self::resource_link_md(&resource_link.uri)
 505            }
 506            acp::ContentBlock::Resource(acp::EmbeddedResource {
 507                resource:
 508                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 509                        uri,
 510                        ..
 511                    }),
 512                ..
 513            }) => Self::resource_link_md(&uri),
 514            acp::ContentBlock::Image(image) => Self::image_md(&image),
 515            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 516        }
 517    }
 518
 519    fn resource_link_md(uri: &str) -> String {
 520        if let Some(uri) = MentionUri::parse(uri).log_err() {
 521            uri.as_link().to_string()
 522        } else {
 523            uri.to_string()
 524        }
 525    }
 526
 527    fn image_md(_image: &acp::ImageContent) -> String {
 528        "`Image`".into()
 529    }
 530
 531    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 532        match self {
 533            ContentBlock::Empty => "",
 534            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 535            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 536        }
 537    }
 538
 539    pub const fn markdown(&self) -> Option<&Entity<Markdown>> {
 540        match self {
 541            ContentBlock::Empty => None,
 542            ContentBlock::Markdown { markdown } => Some(markdown),
 543            ContentBlock::ResourceLink { .. } => None,
 544        }
 545    }
 546
 547    pub const fn resource_link(&self) -> Option<&acp::ResourceLink> {
 548        match self {
 549            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 550            _ => None,
 551        }
 552    }
 553}
 554
 555#[derive(Debug)]
 556pub enum ToolCallContent {
 557    ContentBlock(ContentBlock),
 558    Diff(Entity<Diff>),
 559    Terminal(Entity<Terminal>),
 560}
 561
 562impl ToolCallContent {
 563    pub fn from_acp(
 564        content: acp::ToolCallContent,
 565        language_registry: Arc<LanguageRegistry>,
 566        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 567        cx: &mut App,
 568    ) -> Result<Self> {
 569        match content {
 570            acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
 571                content,
 572                &language_registry,
 573                cx,
 574            ))),
 575            acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
 576                Diff::finalized(
 577                    diff.path.to_string_lossy().into_owned(),
 578                    diff.old_text,
 579                    diff.new_text,
 580                    language_registry,
 581                    cx,
 582                )
 583            }))),
 584            acp::ToolCallContent::Terminal { terminal_id } => terminals
 585                .get(&terminal_id)
 586                .cloned()
 587                .map(Self::Terminal)
 588                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 589        }
 590    }
 591
 592    pub fn update_from_acp(
 593        &mut self,
 594        new: acp::ToolCallContent,
 595        language_registry: Arc<LanguageRegistry>,
 596        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 597        cx: &mut App,
 598    ) -> Result<()> {
 599        let needs_update = match (&self, &new) {
 600            (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
 601                old_diff.read(cx).needs_update(
 602                    new_diff.old_text.as_deref().unwrap_or(""),
 603                    &new_diff.new_text,
 604                    cx,
 605                )
 606            }
 607            _ => true,
 608        };
 609
 610        if needs_update {
 611            *self = Self::from_acp(new, language_registry, terminals, cx)?;
 612        }
 613        Ok(())
 614    }
 615
 616    pub fn to_markdown(&self, cx: &App) -> String {
 617        match self {
 618            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 619            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 620            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 621        }
 622    }
 623}
 624
 625#[derive(Debug, PartialEq)]
 626pub enum ToolCallUpdate {
 627    UpdateFields(acp::ToolCallUpdate),
 628    UpdateDiff(ToolCallUpdateDiff),
 629    UpdateTerminal(ToolCallUpdateTerminal),
 630}
 631
 632impl ToolCallUpdate {
 633    const fn id(&self) -> &acp::ToolCallId {
 634        match self {
 635            Self::UpdateFields(update) => &update.id,
 636            Self::UpdateDiff(diff) => &diff.id,
 637            Self::UpdateTerminal(terminal) => &terminal.id,
 638        }
 639    }
 640}
 641
 642impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 643    fn from(update: acp::ToolCallUpdate) -> Self {
 644        Self::UpdateFields(update)
 645    }
 646}
 647
 648impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 649    fn from(diff: ToolCallUpdateDiff) -> Self {
 650        Self::UpdateDiff(diff)
 651    }
 652}
 653
 654#[derive(Debug, PartialEq)]
 655pub struct ToolCallUpdateDiff {
 656    pub id: acp::ToolCallId,
 657    pub diff: Entity<Diff>,
 658}
 659
 660impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 661    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 662        Self::UpdateTerminal(terminal)
 663    }
 664}
 665
 666#[derive(Debug, PartialEq)]
 667pub struct ToolCallUpdateTerminal {
 668    pub id: acp::ToolCallId,
 669    pub terminal: Entity<Terminal>,
 670}
 671
 672#[derive(Debug, Default)]
 673pub struct Plan {
 674    pub entries: Vec<PlanEntry>,
 675}
 676
 677#[derive(Debug)]
 678pub struct PlanStats<'a> {
 679    pub in_progress_entry: Option<&'a PlanEntry>,
 680    pub pending: u32,
 681    pub completed: u32,
 682}
 683
 684impl Plan {
 685    pub const fn is_empty(&self) -> bool {
 686        self.entries.is_empty()
 687    }
 688
 689    pub fn stats(&self) -> PlanStats<'_> {
 690        let mut stats = PlanStats {
 691            in_progress_entry: None,
 692            pending: 0,
 693            completed: 0,
 694        };
 695
 696        for entry in &self.entries {
 697            match &entry.status {
 698                acp::PlanEntryStatus::Pending => {
 699                    stats.pending += 1;
 700                }
 701                acp::PlanEntryStatus::InProgress => {
 702                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 703                }
 704                acp::PlanEntryStatus::Completed => {
 705                    stats.completed += 1;
 706                }
 707            }
 708        }
 709
 710        stats
 711    }
 712}
 713
 714#[derive(Debug)]
 715pub struct PlanEntry {
 716    pub content: Entity<Markdown>,
 717    pub priority: acp::PlanEntryPriority,
 718    pub status: acp::PlanEntryStatus,
 719}
 720
 721impl PlanEntry {
 722    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 723        Self {
 724            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 725            priority: entry.priority,
 726            status: entry.status,
 727        }
 728    }
 729}
 730
 731#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 732pub struct TokenUsage {
 733    pub max_tokens: u64,
 734    pub used_tokens: u64,
 735}
 736
 737impl TokenUsage {
 738    pub fn ratio(&self) -> TokenUsageRatio {
 739        #[cfg(debug_assertions)]
 740        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 741            .unwrap_or("0.8".to_string())
 742            .parse()
 743            .unwrap();
 744        #[cfg(not(debug_assertions))]
 745        let warning_threshold: f32 = 0.8;
 746
 747        // When the maximum is unknown because there is no selected model,
 748        // avoid showing the token limit warning.
 749        if self.max_tokens == 0 {
 750            TokenUsageRatio::Normal
 751        } else if self.used_tokens >= self.max_tokens {
 752            TokenUsageRatio::Exceeded
 753        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 754            TokenUsageRatio::Warning
 755        } else {
 756            TokenUsageRatio::Normal
 757        }
 758    }
 759}
 760
 761#[derive(Debug, Clone, PartialEq, Eq)]
 762pub enum TokenUsageRatio {
 763    Normal,
 764    Warning,
 765    Exceeded,
 766}
 767
 768#[derive(Debug, Clone)]
 769pub struct RetryStatus {
 770    pub last_error: SharedString,
 771    pub attempt: usize,
 772    pub max_attempts: usize,
 773    pub started_at: Instant,
 774    pub duration: Duration,
 775}
 776
 777pub struct AcpThread {
 778    title: SharedString,
 779    entries: Vec<AgentThreadEntry>,
 780    plan: Plan,
 781    project: Entity<Project>,
 782    action_log: Entity<ActionLog>,
 783    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 784    send_task: Option<Task<()>>,
 785    connection: Rc<dyn AgentConnection>,
 786    session_id: acp::SessionId,
 787    token_usage: Option<TokenUsage>,
 788    prompt_capabilities: acp::PromptCapabilities,
 789    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 790    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 791    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 792    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 793}
 794
 795#[derive(Debug)]
 796pub enum AcpThreadEvent {
 797    NewEntry,
 798    TitleUpdated,
 799    TokenUsageUpdated,
 800    EntryUpdated(usize),
 801    EntriesRemoved(Range<usize>),
 802    ToolAuthorizationRequired,
 803    Retry(RetryStatus),
 804    Stopped,
 805    Error,
 806    LoadError(LoadError),
 807    PromptCapabilitiesUpdated,
 808    Refusal,
 809    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 810    ModeUpdated(acp::SessionModeId),
 811}
 812
 813impl EventEmitter<AcpThreadEvent> for AcpThread {}
 814
 815#[derive(Debug, Clone)]
 816pub enum TerminalProviderEvent {
 817    Created {
 818        terminal_id: acp::TerminalId,
 819        label: String,
 820        cwd: Option<PathBuf>,
 821        output_byte_limit: Option<u64>,
 822        terminal: Entity<::terminal::Terminal>,
 823    },
 824    Output {
 825        terminal_id: acp::TerminalId,
 826        data: Vec<u8>,
 827    },
 828    TitleChanged {
 829        terminal_id: acp::TerminalId,
 830        title: String,
 831    },
 832    Exit {
 833        terminal_id: acp::TerminalId,
 834        status: acp::TerminalExitStatus,
 835    },
 836}
 837
 838#[derive(Debug, Clone)]
 839pub enum TerminalProviderCommand {
 840    WriteInput {
 841        terminal_id: acp::TerminalId,
 842        bytes: Vec<u8>,
 843    },
 844    Resize {
 845        terminal_id: acp::TerminalId,
 846        cols: u16,
 847        rows: u16,
 848    },
 849    Close {
 850        terminal_id: acp::TerminalId,
 851    },
 852}
 853
 854impl AcpThread {
 855    pub fn on_terminal_provider_event(
 856        &mut self,
 857        event: TerminalProviderEvent,
 858        cx: &mut Context<Self>,
 859    ) {
 860        match event {
 861            TerminalProviderEvent::Created {
 862                terminal_id,
 863                label,
 864                cwd,
 865                output_byte_limit,
 866                terminal,
 867            } => {
 868                let entity = self.register_terminal_created(
 869                    terminal_id.clone(),
 870                    label,
 871                    cwd,
 872                    output_byte_limit,
 873                    terminal,
 874                    cx,
 875                );
 876
 877                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 878                    for data in chunks.drain(..) {
 879                        entity.update(cx, |term, cx| {
 880                            term.inner().update(cx, |inner, cx| {
 881                                inner.write_output(&data, cx);
 882                            })
 883                        });
 884                    }
 885                }
 886
 887                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 888                    entity.update(cx, |_term, cx| {
 889                        cx.notify();
 890                    });
 891                }
 892
 893                cx.notify();
 894            }
 895            TerminalProviderEvent::Output { terminal_id, data } => {
 896                if let Some(entity) = self.terminals.get(&terminal_id) {
 897                    entity.update(cx, |term, cx| {
 898                        term.inner().update(cx, |inner, cx| {
 899                            inner.write_output(&data, cx);
 900                        })
 901                    });
 902                } else {
 903                    self.pending_terminal_output
 904                        .entry(terminal_id)
 905                        .or_default()
 906                        .push(data);
 907                }
 908            }
 909            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 910                if let Some(entity) = self.terminals.get(&terminal_id) {
 911                    entity.update(cx, |term, cx| {
 912                        term.inner().update(cx, |inner, cx| {
 913                            inner.breadcrumb_text = title;
 914                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 915                        })
 916                    });
 917                }
 918            }
 919            TerminalProviderEvent::Exit {
 920                terminal_id,
 921                status,
 922            } => {
 923                if let Some(entity) = self.terminals.get(&terminal_id) {
 924                    entity.update(cx, |_term, cx| {
 925                        cx.notify();
 926                    });
 927                } else {
 928                    self.pending_terminal_exit.insert(terminal_id, status);
 929                }
 930            }
 931        }
 932    }
 933}
 934
 935#[derive(PartialEq, Eq, Debug)]
 936pub enum ThreadStatus {
 937    Idle,
 938    Generating,
 939}
 940
 941#[derive(Debug, Clone)]
 942pub enum LoadError {
 943    Unsupported {
 944        command: SharedString,
 945        current_version: SharedString,
 946        minimum_version: SharedString,
 947    },
 948    FailedToInstall(SharedString),
 949    Exited {
 950        status: ExitStatus,
 951    },
 952    Other(SharedString),
 953}
 954
 955impl Display for LoadError {
 956    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 957        match self {
 958            LoadError::Unsupported {
 959                command: path,
 960                current_version,
 961                minimum_version,
 962            } => {
 963                write!(
 964                    f,
 965                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
 966                )
 967            }
 968            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
 969            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
 970            LoadError::Other(msg) => write!(f, "{msg}"),
 971        }
 972    }
 973}
 974
 975impl Error for LoadError {}
 976
 977impl AcpThread {
 978    pub fn new(
 979        title: impl Into<SharedString>,
 980        connection: Rc<dyn AgentConnection>,
 981        project: Entity<Project>,
 982        action_log: Entity<ActionLog>,
 983        session_id: acp::SessionId,
 984        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
 985        cx: &mut Context<Self>,
 986    ) -> Self {
 987        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
 988        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
 989            loop {
 990                let caps = prompt_capabilities_rx.recv().await?;
 991                this.update(cx, |this, cx| {
 992                    this.prompt_capabilities = caps;
 993                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
 994                })?;
 995            }
 996        });
 997
 998        Self {
 999            action_log,
1000            shared_buffers: Default::default(),
1001            entries: Default::default(),
1002            plan: Default::default(),
1003            title: title.into(),
1004            project,
1005            send_task: None,
1006            connection,
1007            session_id,
1008            token_usage: None,
1009            prompt_capabilities,
1010            _observe_prompt_capabilities: task,
1011            terminals: HashMap::default(),
1012            pending_terminal_output: HashMap::default(),
1013            pending_terminal_exit: HashMap::default(),
1014        }
1015    }
1016
1017    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1018        self.prompt_capabilities.clone()
1019    }
1020
1021    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1022        &self.connection
1023    }
1024
1025    pub const fn action_log(&self) -> &Entity<ActionLog> {
1026        &self.action_log
1027    }
1028
1029    pub const fn project(&self) -> &Entity<Project> {
1030        &self.project
1031    }
1032
1033    pub fn title(&self) -> SharedString {
1034        self.title.clone()
1035    }
1036
1037    pub fn entries(&self) -> &[AgentThreadEntry] {
1038        &self.entries
1039    }
1040
1041    pub const fn session_id(&self) -> &acp::SessionId {
1042        &self.session_id
1043    }
1044
1045    pub const fn status(&self) -> ThreadStatus {
1046        if self.send_task.is_some() {
1047            ThreadStatus::Generating
1048        } else {
1049            ThreadStatus::Idle
1050        }
1051    }
1052
1053    pub const fn token_usage(&self) -> Option<&TokenUsage> {
1054        self.token_usage.as_ref()
1055    }
1056
1057    pub fn has_pending_edit_tool_calls(&self) -> bool {
1058        for entry in self.entries.iter().rev() {
1059            match entry {
1060                AgentThreadEntry::UserMessage(_) => return false,
1061                AgentThreadEntry::ToolCall(
1062                    call @ ToolCall {
1063                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1064                        ..
1065                    },
1066                ) if call.diffs().next().is_some() => {
1067                    return true;
1068                }
1069                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1070            }
1071        }
1072
1073        false
1074    }
1075
1076    pub fn used_tools_since_last_user_message(&self) -> bool {
1077        for entry in self.entries.iter().rev() {
1078            match entry {
1079                AgentThreadEntry::UserMessage(..) => return false,
1080                AgentThreadEntry::AssistantMessage(..) => continue,
1081                AgentThreadEntry::ToolCall(..) => return true,
1082            }
1083        }
1084
1085        false
1086    }
1087
1088    pub fn handle_session_update(
1089        &mut self,
1090        update: acp::SessionUpdate,
1091        cx: &mut Context<Self>,
1092    ) -> Result<(), acp::Error> {
1093        match update {
1094            acp::SessionUpdate::UserMessageChunk { content } => {
1095                self.push_user_content_block(None, content, cx);
1096            }
1097            acp::SessionUpdate::AgentMessageChunk { content } => {
1098                self.push_assistant_content_block(content, false, cx);
1099            }
1100            acp::SessionUpdate::AgentThoughtChunk { content } => {
1101                self.push_assistant_content_block(content, true, cx);
1102            }
1103            acp::SessionUpdate::ToolCall(tool_call) => {
1104                self.upsert_tool_call(tool_call, cx)?;
1105            }
1106            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1107                self.update_tool_call(tool_call_update, cx)?;
1108            }
1109            acp::SessionUpdate::Plan(plan) => {
1110                self.update_plan(plan, cx);
1111            }
1112            acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => {
1113                cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands))
1114            }
1115            acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => {
1116                cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id))
1117            }
1118        }
1119        Ok(())
1120    }
1121
1122    pub fn push_user_content_block(
1123        &mut self,
1124        message_id: Option<UserMessageId>,
1125        chunk: acp::ContentBlock,
1126        cx: &mut Context<Self>,
1127    ) {
1128        let language_registry = self.project.read(cx).languages().clone();
1129        let entries_len = self.entries.len();
1130
1131        if let Some(last_entry) = self.entries.last_mut()
1132            && let AgentThreadEntry::UserMessage(UserMessage {
1133                id,
1134                content,
1135                chunks,
1136                ..
1137            }) = last_entry
1138        {
1139            *id = message_id.or(id.take());
1140            content.append(chunk.clone(), &language_registry, cx);
1141            chunks.push(chunk);
1142            let idx = entries_len - 1;
1143            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1144        } else {
1145            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
1146            self.push_entry(
1147                AgentThreadEntry::UserMessage(UserMessage {
1148                    id: message_id,
1149                    content,
1150                    chunks: vec![chunk],
1151                    checkpoint: None,
1152                }),
1153                cx,
1154            );
1155        }
1156    }
1157
1158    pub fn push_assistant_content_block(
1159        &mut self,
1160        chunk: acp::ContentBlock,
1161        is_thought: bool,
1162        cx: &mut Context<Self>,
1163    ) {
1164        let language_registry = self.project.read(cx).languages().clone();
1165        let entries_len = self.entries.len();
1166        if let Some(last_entry) = self.entries.last_mut()
1167            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1168        {
1169            let idx = entries_len - 1;
1170            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1171            match (chunks.last_mut(), is_thought) {
1172                (Some(AssistantMessageChunk::Message { block }), false)
1173                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1174                    block.append(chunk, &language_registry, cx)
1175                }
1176                _ => {
1177                    let block = ContentBlock::new(chunk, &language_registry, cx);
1178                    if is_thought {
1179                        chunks.push(AssistantMessageChunk::Thought { block })
1180                    } else {
1181                        chunks.push(AssistantMessageChunk::Message { block })
1182                    }
1183                }
1184            }
1185        } else {
1186            let block = ContentBlock::new(chunk, &language_registry, cx);
1187            let chunk = if is_thought {
1188                AssistantMessageChunk::Thought { block }
1189            } else {
1190                AssistantMessageChunk::Message { block }
1191            };
1192
1193            self.push_entry(
1194                AgentThreadEntry::AssistantMessage(AssistantMessage {
1195                    chunks: vec![chunk],
1196                }),
1197                cx,
1198            );
1199        }
1200    }
1201
1202    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1203        self.entries.push(entry);
1204        cx.emit(AcpThreadEvent::NewEntry);
1205    }
1206
1207    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1208        self.connection.set_title(&self.session_id, cx).is_some()
1209    }
1210
1211    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1212        if title != self.title {
1213            self.title = title.clone();
1214            cx.emit(AcpThreadEvent::TitleUpdated);
1215            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1216                return set_title.run(title, cx);
1217            }
1218        }
1219        Task::ready(Ok(()))
1220    }
1221
1222    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1223        self.token_usage = usage;
1224        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1225    }
1226
1227    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1228        cx.emit(AcpThreadEvent::Retry(status));
1229    }
1230
1231    pub fn update_tool_call(
1232        &mut self,
1233        update: impl Into<ToolCallUpdate>,
1234        cx: &mut Context<Self>,
1235    ) -> Result<()> {
1236        let update = update.into();
1237        let languages = self.project.read(cx).languages().clone();
1238
1239        let ix = match self.index_for_tool_call(update.id()) {
1240            Some(ix) => ix,
1241            None => {
1242                // Tool call not found - create a failed tool call entry
1243                let failed_tool_call = ToolCall {
1244                    id: update.id().clone(),
1245                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1246                    kind: acp::ToolKind::Fetch,
1247                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1248                        acp::ContentBlock::Text(acp::TextContent {
1249                            text: "Tool call not found".to_string(),
1250                            annotations: None,
1251                            meta: None,
1252                        }),
1253                        &languages,
1254                        cx,
1255                    ))],
1256                    status: ToolCallStatus::Failed,
1257                    locations: Vec::new(),
1258                    resolved_locations: Vec::new(),
1259                    raw_input: None,
1260                    raw_output: None,
1261                };
1262                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1263                return Ok(());
1264            }
1265        };
1266        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1267            unreachable!()
1268        };
1269
1270        match update {
1271            ToolCallUpdate::UpdateFields(update) => {
1272                let location_updated = update.fields.locations.is_some();
1273                call.update_fields(update.fields, languages, &self.terminals, cx)?;
1274                if location_updated {
1275                    self.resolve_locations(update.id, cx);
1276                }
1277            }
1278            ToolCallUpdate::UpdateDiff(update) => {
1279                call.content.clear();
1280                call.content.push(ToolCallContent::Diff(update.diff));
1281            }
1282            ToolCallUpdate::UpdateTerminal(update) => {
1283                call.content.clear();
1284                call.content
1285                    .push(ToolCallContent::Terminal(update.terminal));
1286            }
1287        }
1288
1289        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1290
1291        Ok(())
1292    }
1293
1294    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1295    pub fn upsert_tool_call(
1296        &mut self,
1297        tool_call: acp::ToolCall,
1298        cx: &mut Context<Self>,
1299    ) -> Result<(), acp::Error> {
1300        let status = tool_call.status.into();
1301        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1302    }
1303
1304    /// Fails if id does not match an existing entry.
1305    pub fn upsert_tool_call_inner(
1306        &mut self,
1307        update: acp::ToolCallUpdate,
1308        status: ToolCallStatus,
1309        cx: &mut Context<Self>,
1310    ) -> Result<(), acp::Error> {
1311        let language_registry = self.project.read(cx).languages().clone();
1312        let id = update.id.clone();
1313
1314        if let Some(ix) = self.index_for_tool_call(&id) {
1315            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1316                unreachable!()
1317            };
1318
1319            call.update_fields(update.fields, language_registry, &self.terminals, cx)?;
1320            call.status = status;
1321
1322            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1323        } else {
1324            let call = ToolCall::from_acp(
1325                update.try_into()?,
1326                status,
1327                language_registry,
1328                &self.terminals,
1329                cx,
1330            )?;
1331            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1332        };
1333
1334        self.resolve_locations(id, cx);
1335        Ok(())
1336    }
1337
1338    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1339        self.entries
1340            .iter()
1341            .enumerate()
1342            .rev()
1343            .find_map(|(index, entry)| {
1344                if let AgentThreadEntry::ToolCall(tool_call) = entry
1345                    && &tool_call.id == id
1346                {
1347                    Some(index)
1348                } else {
1349                    None
1350                }
1351            })
1352    }
1353
1354    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1355        // The tool call we are looking for is typically the last one, or very close to the end.
1356        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1357        self.entries
1358            .iter_mut()
1359            .enumerate()
1360            .rev()
1361            .find_map(|(index, tool_call)| {
1362                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1363                    && &tool_call.id == id
1364                {
1365                    Some((index, tool_call))
1366                } else {
1367                    None
1368                }
1369            })
1370    }
1371
1372    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1373        self.entries
1374            .iter()
1375            .enumerate()
1376            .rev()
1377            .find_map(|(index, tool_call)| {
1378                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1379                    && &tool_call.id == id
1380                {
1381                    Some((index, tool_call))
1382                } else {
1383                    None
1384                }
1385            })
1386    }
1387
1388    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1389        let project = self.project.clone();
1390        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1391            return;
1392        };
1393        let task = tool_call.resolve_locations(project, cx);
1394        cx.spawn(async move |this, cx| {
1395            let resolved_locations = task.await;
1396            this.update(cx, |this, cx| {
1397                let project = this.project.clone();
1398                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1399                    return;
1400                };
1401                if let Some(Some(location)) = resolved_locations.last() {
1402                    project.update(cx, |project, cx| {
1403                        if let Some(agent_location) = project.agent_location() {
1404                            let should_ignore = agent_location.buffer == location.buffer
1405                                && location
1406                                    .buffer
1407                                    .update(cx, |buffer, _| {
1408                                        let snapshot = buffer.snapshot();
1409                                        let old_position =
1410                                            agent_location.position.to_point(&snapshot);
1411                                        let new_position = location.position.to_point(&snapshot);
1412                                        // ignore this so that when we get updates from the edit tool
1413                                        // the position doesn't reset to the startof line
1414                                        old_position.row == new_position.row
1415                                            && old_position.column > new_position.column
1416                                    })
1417                                    .ok()
1418                                    .unwrap_or_default();
1419                            if !should_ignore {
1420                                project.set_agent_location(Some(location.clone()), cx);
1421                            }
1422                        }
1423                    });
1424                }
1425                if tool_call.resolved_locations != resolved_locations {
1426                    tool_call.resolved_locations = resolved_locations;
1427                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1428                }
1429            })
1430        })
1431        .detach();
1432    }
1433
1434    pub fn request_tool_call_authorization(
1435        &mut self,
1436        tool_call: acp::ToolCallUpdate,
1437        options: Vec<acp::PermissionOption>,
1438        respect_always_allow_setting: bool,
1439        cx: &mut Context<Self>,
1440    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1441        let (tx, rx) = oneshot::channel();
1442
1443        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1444            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1445            // some tools would (incorrectly) continue to auto-accept.
1446            if let Some(allow_once_option) = options.iter().find_map(|option| {
1447                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1448                    Some(option.id.clone())
1449                } else {
1450                    None
1451                }
1452            }) {
1453                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1454                return Ok(async {
1455                    acp::RequestPermissionOutcome::Selected {
1456                        option_id: allow_once_option,
1457                    }
1458                }
1459                .boxed());
1460            }
1461        }
1462
1463        let status = ToolCallStatus::WaitingForConfirmation {
1464            options,
1465            respond_tx: tx,
1466        };
1467
1468        self.upsert_tool_call_inner(tool_call, status, cx)?;
1469        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1470
1471        let fut = async {
1472            match rx.await {
1473                Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1474                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1475            }
1476        }
1477        .boxed();
1478
1479        Ok(fut)
1480    }
1481
1482    pub fn authorize_tool_call(
1483        &mut self,
1484        id: acp::ToolCallId,
1485        option_id: acp::PermissionOptionId,
1486        option_kind: acp::PermissionOptionKind,
1487        cx: &mut Context<Self>,
1488    ) {
1489        let Some((ix, call)) = self.tool_call_mut(&id) else {
1490            return;
1491        };
1492
1493        let new_status = match option_kind {
1494            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1495                ToolCallStatus::Rejected
1496            }
1497            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1498                ToolCallStatus::InProgress
1499            }
1500        };
1501
1502        let curr_status = mem::replace(&mut call.status, new_status);
1503
1504        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1505            respond_tx.send(option_id).log_err();
1506        } else if cfg!(debug_assertions) {
1507            panic!("tried to authorize an already authorized tool call");
1508        }
1509
1510        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1511    }
1512
1513    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1514        let mut first_tool_call = None;
1515
1516        for entry in self.entries.iter().rev() {
1517            match &entry {
1518                AgentThreadEntry::ToolCall(call) => {
1519                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1520                        first_tool_call = Some(call);
1521                    } else {
1522                        continue;
1523                    }
1524                }
1525                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1526                    // Reached the beginning of the turn.
1527                    // If we had pending permission requests in the previous turn, they have been cancelled.
1528                    break;
1529                }
1530            }
1531        }
1532
1533        first_tool_call
1534    }
1535
1536    pub const fn plan(&self) -> &Plan {
1537        &self.plan
1538    }
1539
1540    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1541        let new_entries_len = request.entries.len();
1542        let mut new_entries = request.entries.into_iter();
1543
1544        // Reuse existing markdown to prevent flickering
1545        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1546            let PlanEntry {
1547                content,
1548                priority,
1549                status,
1550            } = old;
1551            content.update(cx, |old, cx| {
1552                old.replace(new.content, cx);
1553            });
1554            *priority = new.priority;
1555            *status = new.status;
1556        }
1557        for new in new_entries {
1558            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1559        }
1560        self.plan.entries.truncate(new_entries_len);
1561
1562        cx.notify();
1563    }
1564
1565    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1566        self.plan
1567            .entries
1568            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1569        cx.notify();
1570    }
1571
1572    #[cfg(any(test, feature = "test-support"))]
1573    pub fn send_raw(
1574        &mut self,
1575        message: &str,
1576        cx: &mut Context<Self>,
1577    ) -> BoxFuture<'static, Result<()>> {
1578        self.send(
1579            vec![acp::ContentBlock::Text(acp::TextContent {
1580                text: message.to_string(),
1581                annotations: None,
1582                meta: None,
1583            })],
1584            cx,
1585        )
1586    }
1587
1588    pub fn send(
1589        &mut self,
1590        message: Vec<acp::ContentBlock>,
1591        cx: &mut Context<Self>,
1592    ) -> BoxFuture<'static, Result<()>> {
1593        let block = ContentBlock::new_combined(
1594            message.clone(),
1595            self.project.read(cx).languages().clone(),
1596            cx,
1597        );
1598        let request = acp::PromptRequest {
1599            prompt: message.clone(),
1600            session_id: self.session_id.clone(),
1601            meta: None,
1602        };
1603        let git_store = self.project.read(cx).git_store().clone();
1604
1605        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1606            Some(UserMessageId::new())
1607        } else {
1608            None
1609        };
1610
1611        self.run_turn(cx, async move |this, cx| {
1612            this.update(cx, |this, cx| {
1613                this.push_entry(
1614                    AgentThreadEntry::UserMessage(UserMessage {
1615                        id: message_id.clone(),
1616                        content: block,
1617                        chunks: message,
1618                        checkpoint: None,
1619                    }),
1620                    cx,
1621                );
1622            })
1623            .ok();
1624
1625            let old_checkpoint = git_store
1626                .update(cx, |git, cx| git.checkpoint(cx))?
1627                .await
1628                .context("failed to get old checkpoint")
1629                .log_err();
1630            this.update(cx, |this, cx| {
1631                if let Some((_ix, message)) = this.last_user_message() {
1632                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1633                        git_checkpoint,
1634                        show: false,
1635                    });
1636                }
1637                this.connection.prompt(message_id, request, cx)
1638            })?
1639            .await
1640        })
1641    }
1642
1643    pub fn can_resume(&self, cx: &App) -> bool {
1644        self.connection.resume(&self.session_id, cx).is_some()
1645    }
1646
1647    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1648        self.run_turn(cx, async move |this, cx| {
1649            this.update(cx, |this, cx| {
1650                this.connection
1651                    .resume(&this.session_id, cx)
1652                    .map(|resume| resume.run(cx))
1653            })?
1654            .context("resuming a session is not supported")?
1655            .await
1656        })
1657    }
1658
1659    fn run_turn(
1660        &mut self,
1661        cx: &mut Context<Self>,
1662        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1663    ) -> BoxFuture<'static, Result<()>> {
1664        self.clear_completed_plan_entries(cx);
1665
1666        let (tx, rx) = oneshot::channel();
1667        let cancel_task = self.cancel(cx);
1668
1669        self.send_task = Some(cx.spawn(async move |this, cx| {
1670            cancel_task.await;
1671            tx.send(f(this, cx).await).ok();
1672        }));
1673
1674        cx.spawn(async move |this, cx| {
1675            let response = rx.await;
1676
1677            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1678                .await?;
1679
1680            this.update(cx, |this, cx| {
1681                this.project
1682                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1683                match response {
1684                    Ok(Err(e)) => {
1685                        this.send_task.take();
1686                        cx.emit(AcpThreadEvent::Error);
1687                        Err(e)
1688                    }
1689                    result => {
1690                        let canceled = matches!(
1691                            result,
1692                            Ok(Ok(acp::PromptResponse {
1693                                stop_reason: acp::StopReason::Cancelled,
1694                                meta: None,
1695                            }))
1696                        );
1697
1698                        // We only take the task if the current prompt wasn't canceled.
1699                        //
1700                        // This prompt may have been canceled because another one was sent
1701                        // while it was still generating. In these cases, dropping `send_task`
1702                        // would cause the next generation to be canceled.
1703                        if !canceled {
1704                            this.send_task.take();
1705                        }
1706
1707                        // Handle refusal - distinguish between user prompt and tool call refusals
1708                        if let Ok(Ok(acp::PromptResponse {
1709                            stop_reason: acp::StopReason::Refusal,
1710                            meta: _,
1711                        })) = result
1712                        {
1713                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1714                                // Check if there's a completed tool call with results after the last user message
1715                                // This indicates the refusal is in response to tool output, not the user's prompt
1716                                let has_completed_tool_call_after_user_msg =
1717                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1718                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1719                                            // Check if the tool call has completed and has output
1720                                            matches!(tool_call.status, ToolCallStatus::Completed)
1721                                                && tool_call.raw_output.is_some()
1722                                        } else {
1723                                            false
1724                                        }
1725                                    });
1726
1727                                if has_completed_tool_call_after_user_msg {
1728                                    // Refusal is due to tool output - don't truncate, just notify
1729                                    // The model refused based on what the tool returned
1730                                    cx.emit(AcpThreadEvent::Refusal);
1731                                } else {
1732                                    // User prompt was refused - truncate back to before the user message
1733                                    let range = user_msg_ix..this.entries.len();
1734                                    if range.start < range.end {
1735                                        this.entries.truncate(user_msg_ix);
1736                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1737                                    }
1738                                    cx.emit(AcpThreadEvent::Refusal);
1739                                }
1740                            } else {
1741                                // No user message found, treat as general refusal
1742                                cx.emit(AcpThreadEvent::Refusal);
1743                            }
1744                        }
1745
1746                        cx.emit(AcpThreadEvent::Stopped);
1747                        Ok(())
1748                    }
1749                }
1750            })?
1751        })
1752        .boxed()
1753    }
1754
1755    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1756        let Some(send_task) = self.send_task.take() else {
1757            return Task::ready(());
1758        };
1759
1760        for entry in self.entries.iter_mut() {
1761            if let AgentThreadEntry::ToolCall(call) = entry {
1762                let cancel = matches!(
1763                    call.status,
1764                    ToolCallStatus::Pending
1765                        | ToolCallStatus::WaitingForConfirmation { .. }
1766                        | ToolCallStatus::InProgress
1767                );
1768
1769                if cancel {
1770                    call.status = ToolCallStatus::Canceled;
1771                }
1772            }
1773        }
1774
1775        self.connection.cancel(&self.session_id, cx);
1776
1777        // Wait for the send task to complete
1778        cx.foreground_executor().spawn(send_task)
1779    }
1780
1781    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1782    pub fn restore_checkpoint(
1783        &mut self,
1784        id: UserMessageId,
1785        cx: &mut Context<Self>,
1786    ) -> Task<Result<()>> {
1787        let Some((_, message)) = self.user_message_mut(&id) else {
1788            return Task::ready(Err(anyhow!("message not found")));
1789        };
1790
1791        let checkpoint = message
1792            .checkpoint
1793            .as_ref()
1794            .map(|c| c.git_checkpoint.clone());
1795        let rewind = self.rewind(id.clone(), cx);
1796        let git_store = self.project.read(cx).git_store().clone();
1797
1798        cx.spawn(async move |_, cx| {
1799            rewind.await?;
1800            if let Some(checkpoint) = checkpoint {
1801                git_store
1802                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1803                    .await?;
1804            }
1805
1806            Ok(())
1807        })
1808    }
1809
1810    /// Rewinds this thread to before the entry at `index`, removing it and all
1811    /// subsequent entries while rejecting any action_log changes made from that point.
1812    /// Unlike `restore_checkpoint`, this method does not restore from git.
1813    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1814        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1815            return Task::ready(Err(anyhow!("not supported")));
1816        };
1817
1818        cx.spawn(async move |this, cx| {
1819            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1820            this.update(cx, |this, cx| {
1821                if let Some((ix, _)) = this.user_message_mut(&id) {
1822                    let range = ix..this.entries.len();
1823                    this.entries.truncate(ix);
1824                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1825                }
1826                this.action_log()
1827                    .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1828            })?
1829            .await;
1830            Ok(())
1831        })
1832    }
1833
1834    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1835        let git_store = self.project.read(cx).git_store().clone();
1836
1837        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1838            if let Some(checkpoint) = message.checkpoint.as_ref() {
1839                checkpoint.git_checkpoint.clone()
1840            } else {
1841                return Task::ready(Ok(()));
1842            }
1843        } else {
1844            return Task::ready(Ok(()));
1845        };
1846
1847        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1848        cx.spawn(async move |this, cx| {
1849            let new_checkpoint = new_checkpoint
1850                .await
1851                .context("failed to get new checkpoint")
1852                .log_err();
1853            if let Some(new_checkpoint) = new_checkpoint {
1854                let equal = git_store
1855                    .update(cx, |git, cx| {
1856                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1857                    })?
1858                    .await
1859                    .unwrap_or(true);
1860                this.update(cx, |this, cx| {
1861                    let (ix, message) = this.last_user_message().context("no user message")?;
1862                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1863                    checkpoint.show = !equal;
1864                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1865                    anyhow::Ok(())
1866                })??;
1867            }
1868
1869            Ok(())
1870        })
1871    }
1872
1873    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1874        self.entries
1875            .iter_mut()
1876            .enumerate()
1877            .rev()
1878            .find_map(|(ix, entry)| {
1879                if let AgentThreadEntry::UserMessage(message) = entry {
1880                    Some((ix, message))
1881                } else {
1882                    None
1883                }
1884            })
1885    }
1886
1887    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1888        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1889            if let AgentThreadEntry::UserMessage(message) = entry {
1890                if message.id.as_ref() == Some(id) {
1891                    Some((ix, message))
1892                } else {
1893                    None
1894                }
1895            } else {
1896                None
1897            }
1898        })
1899    }
1900
1901    pub fn read_text_file(
1902        &self,
1903        path: PathBuf,
1904        line: Option<u32>,
1905        limit: Option<u32>,
1906        reuse_shared_snapshot: bool,
1907        cx: &mut Context<Self>,
1908    ) -> Task<Result<String, acp::Error>> {
1909        // Args are 1-based, move to 0-based
1910        let line = line.unwrap_or_default().saturating_sub(1);
1911        let limit = limit.unwrap_or(u32::MAX);
1912        let project = self.project.clone();
1913        let action_log = self.action_log.clone();
1914        cx.spawn(async move |this, cx| {
1915            let load = project
1916                .update(cx, |project, cx| {
1917                    let path = project
1918                        .project_path_for_absolute_path(&path, cx)
1919                        .ok_or_else(|| {
1920                            acp::Error::resource_not_found(Some(path.display().to_string()))
1921                        })?;
1922                    Ok(project.open_buffer(path, cx))
1923                })
1924                .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1925                .flatten()?;
1926
1927            let buffer = load.await?;
1928
1929            let snapshot = if reuse_shared_snapshot {
1930                this.read_with(cx, |this, _| {
1931                    this.shared_buffers.get(&buffer.clone()).cloned()
1932                })
1933                .log_err()
1934                .flatten()
1935            } else {
1936                None
1937            };
1938
1939            let snapshot = if let Some(snapshot) = snapshot {
1940                snapshot
1941            } else {
1942                action_log.update(cx, |action_log, cx| {
1943                    action_log.buffer_read(buffer.clone(), cx);
1944                })?;
1945
1946                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
1947                this.update(cx, |this, _| {
1948                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
1949                })?;
1950                snapshot
1951            };
1952
1953            let max_point = snapshot.max_point();
1954            let start_position = Point::new(line, 0);
1955
1956            if start_position > max_point {
1957                return Err(acp::Error::invalid_params().with_data(format!(
1958                    "Attempting to read beyond the end of the file, line {}:{}",
1959                    max_point.row + 1,
1960                    max_point.column
1961                )));
1962            }
1963
1964            let start = snapshot.anchor_before(start_position);
1965            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
1966
1967            project.update(cx, |project, cx| {
1968                project.set_agent_location(
1969                    Some(AgentLocation {
1970                        buffer: buffer.downgrade(),
1971                        position: start,
1972                    }),
1973                    cx,
1974                );
1975            })?;
1976
1977            Ok(snapshot.text_for_range(start..end).collect::<String>())
1978        })
1979    }
1980
1981    pub fn write_text_file(
1982        &self,
1983        path: PathBuf,
1984        content: String,
1985        cx: &mut Context<Self>,
1986    ) -> Task<Result<()>> {
1987        let project = self.project.clone();
1988        let action_log = self.action_log.clone();
1989        cx.spawn(async move |this, cx| {
1990            let load = project.update(cx, |project, cx| {
1991                let path = project
1992                    .project_path_for_absolute_path(&path, cx)
1993                    .context("invalid path")?;
1994                anyhow::Ok(project.open_buffer(path, cx))
1995            });
1996            let buffer = load??.await?;
1997            let snapshot = this.update(cx, |this, cx| {
1998                this.shared_buffers
1999                    .get(&buffer)
2000                    .cloned()
2001                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2002            })?;
2003            let edits = cx
2004                .background_executor()
2005                .spawn(async move {
2006                    let old_text = snapshot.text();
2007                    text_diff(old_text.as_str(), &content)
2008                        .into_iter()
2009                        .map(|(range, replacement)| {
2010                            (
2011                                snapshot.anchor_after(range.start)
2012                                    ..snapshot.anchor_before(range.end),
2013                                replacement,
2014                            )
2015                        })
2016                        .collect::<Vec<_>>()
2017                })
2018                .await;
2019
2020            project.update(cx, |project, cx| {
2021                project.set_agent_location(
2022                    Some(AgentLocation {
2023                        buffer: buffer.downgrade(),
2024                        position: edits
2025                            .last()
2026                            .map(|(range, _)| range.end)
2027                            .unwrap_or(Anchor::MIN),
2028                    }),
2029                    cx,
2030                );
2031            })?;
2032
2033            let format_on_save = cx.update(|cx| {
2034                action_log.update(cx, |action_log, cx| {
2035                    action_log.buffer_read(buffer.clone(), cx);
2036                });
2037
2038                let format_on_save = buffer.update(cx, |buffer, cx| {
2039                    buffer.edit(edits, None, cx);
2040
2041                    let settings = language::language_settings::language_settings(
2042                        buffer.language().map(|l| l.name()),
2043                        buffer.file(),
2044                        cx,
2045                    );
2046
2047                    settings.format_on_save != FormatOnSave::Off
2048                });
2049                action_log.update(cx, |action_log, cx| {
2050                    action_log.buffer_edited(buffer.clone(), cx);
2051                });
2052                format_on_save
2053            })?;
2054
2055            if format_on_save {
2056                let format_task = project.update(cx, |project, cx| {
2057                    project.format(
2058                        HashSet::from_iter([buffer.clone()]),
2059                        LspFormatTarget::Buffers,
2060                        false,
2061                        FormatTrigger::Save,
2062                        cx,
2063                    )
2064                })?;
2065                format_task.await.log_err();
2066
2067                action_log.update(cx, |action_log, cx| {
2068                    action_log.buffer_edited(buffer.clone(), cx);
2069                })?;
2070            }
2071
2072            project
2073                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2074                .await
2075        })
2076    }
2077
2078    pub fn create_terminal(
2079        &self,
2080        command: String,
2081        args: Vec<String>,
2082        extra_env: Vec<acp::EnvVariable>,
2083        cwd: Option<PathBuf>,
2084        output_byte_limit: Option<u64>,
2085        cx: &mut Context<Self>,
2086    ) -> Task<Result<Entity<Terminal>>> {
2087        let env = match &cwd {
2088            Some(dir) => self.project.update(cx, |project, cx| {
2089                let worktree = project.find_worktree(dir.as_path(), cx);
2090                let shell = TerminalSettings::get(
2091                    worktree.as_ref().map(|(worktree, path)| SettingsLocation {
2092                        worktree_id: worktree.read(cx).id(),
2093                        path: &path,
2094                    }),
2095                    cx,
2096                )
2097                .shell
2098                .clone();
2099                project.directory_environment(&shell, dir.as_path().into(), cx)
2100            }),
2101            None => Task::ready(None).shared(),
2102        };
2103        let env = cx.spawn(async move |_, _| {
2104            let mut env = env.await.unwrap_or_default();
2105            // Disables paging for `git` and hopefully other commands
2106            env.insert("PAGER".into(), "".into());
2107            for var in extra_env {
2108                env.insert(var.name, var.value);
2109            }
2110            env
2111        });
2112
2113        let project = self.project.clone();
2114        let language_registry = project.read(cx).languages().clone();
2115        let is_windows = project.read(cx).path_style(cx).is_windows();
2116
2117        let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2118        let terminal_task = cx.spawn({
2119            let terminal_id = terminal_id.clone();
2120            async move |_this, cx| {
2121                let env = env.await;
2122                let shell = project
2123                    .update(cx, |project, cx| {
2124                        project
2125                            .remote_client()
2126                            .and_then(|r| r.read(cx).default_system_shell())
2127                    })?
2128                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2129                let (task_command, task_args) =
2130                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2131                        .redirect_stdin_to_dev_null()
2132                        .build(Some(command.clone()), &args);
2133                let terminal = project
2134                    .update(cx, |project, cx| {
2135                        project.create_terminal_task(
2136                            task::SpawnInTerminal {
2137                                command: Some(task_command),
2138                                args: task_args,
2139                                cwd: cwd.clone(),
2140                                env,
2141                                ..Default::default()
2142                            },
2143                            cx,
2144                        )
2145                    })?
2146                    .await?;
2147
2148                cx.new(|cx| {
2149                    Terminal::new(
2150                        terminal_id,
2151                        &format!("{} {}", command, args.join(" ")),
2152                        cwd,
2153                        output_byte_limit.map(|l| l as usize),
2154                        terminal,
2155                        language_registry,
2156                        cx,
2157                    )
2158                })
2159            }
2160        });
2161
2162        cx.spawn(async move |this, cx| {
2163            let terminal = terminal_task.await?;
2164            this.update(cx, |this, _cx| {
2165                this.terminals.insert(terminal_id, terminal.clone());
2166                terminal
2167            })
2168        })
2169    }
2170
2171    pub fn kill_terminal(
2172        &mut self,
2173        terminal_id: acp::TerminalId,
2174        cx: &mut Context<Self>,
2175    ) -> Result<()> {
2176        self.terminals
2177            .get(&terminal_id)
2178            .context("Terminal not found")?
2179            .update(cx, |terminal, cx| {
2180                terminal.kill(cx);
2181            });
2182
2183        Ok(())
2184    }
2185
2186    pub fn release_terminal(
2187        &mut self,
2188        terminal_id: acp::TerminalId,
2189        cx: &mut Context<Self>,
2190    ) -> Result<()> {
2191        self.terminals
2192            .remove(&terminal_id)
2193            .context("Terminal not found")?
2194            .update(cx, |terminal, cx| {
2195                terminal.kill(cx);
2196            });
2197
2198        Ok(())
2199    }
2200
2201    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2202        self.terminals
2203            .get(&terminal_id)
2204            .context("Terminal not found")
2205            .cloned()
2206    }
2207
2208    pub fn to_markdown(&self, cx: &App) -> String {
2209        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2210    }
2211
2212    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2213        cx.emit(AcpThreadEvent::LoadError(error));
2214    }
2215
2216    pub fn register_terminal_created(
2217        &mut self,
2218        terminal_id: acp::TerminalId,
2219        command_label: String,
2220        working_dir: Option<PathBuf>,
2221        output_byte_limit: Option<u64>,
2222        terminal: Entity<::terminal::Terminal>,
2223        cx: &mut Context<Self>,
2224    ) -> Entity<Terminal> {
2225        let language_registry = self.project.read(cx).languages().clone();
2226
2227        let entity = cx.new(|cx| {
2228            Terminal::new(
2229                terminal_id.clone(),
2230                &command_label,
2231                working_dir.clone(),
2232                output_byte_limit.map(|l| l as usize),
2233                terminal,
2234                language_registry,
2235                cx,
2236            )
2237        });
2238        self.terminals.insert(terminal_id.clone(), entity.clone());
2239        entity
2240    }
2241}
2242
2243fn markdown_for_raw_output(
2244    raw_output: &serde_json::Value,
2245    language_registry: &Arc<LanguageRegistry>,
2246    cx: &mut App,
2247) -> Option<Entity<Markdown>> {
2248    match raw_output {
2249        serde_json::Value::Null => None,
2250        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2251            Markdown::new(
2252                value.to_string().into(),
2253                Some(language_registry.clone()),
2254                None,
2255                cx,
2256            )
2257        })),
2258        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2259            Markdown::new(
2260                value.to_string().into(),
2261                Some(language_registry.clone()),
2262                None,
2263                cx,
2264            )
2265        })),
2266        serde_json::Value::String(value) => Some(cx.new(|cx| {
2267            Markdown::new(
2268                value.clone().into(),
2269                Some(language_registry.clone()),
2270                None,
2271                cx,
2272            )
2273        })),
2274        value => Some(cx.new(|cx| {
2275            Markdown::new(
2276                format!("```json\n{}\n```", value).into(),
2277                Some(language_registry.clone()),
2278                None,
2279                cx,
2280            )
2281        })),
2282    }
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287    use super::*;
2288    use anyhow::anyhow;
2289    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2290    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2291    use indoc::indoc;
2292    use project::{FakeFs, Fs};
2293    use rand::{distr, prelude::*};
2294    use serde_json::json;
2295    use settings::SettingsStore;
2296    use smol::stream::StreamExt as _;
2297    use std::{
2298        any::Any,
2299        cell::RefCell,
2300        path::Path,
2301        rc::Rc,
2302        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2303        time::Duration,
2304    };
2305    use util::path;
2306
2307    fn init_test(cx: &mut TestAppContext) {
2308        env_logger::try_init().ok();
2309        cx.update(|cx| {
2310            let settings_store = SettingsStore::test(cx);
2311            cx.set_global(settings_store);
2312            Project::init_settings(cx);
2313            language::init(cx);
2314        });
2315    }
2316
2317    #[gpui::test]
2318    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2319        init_test(cx);
2320
2321        let fs = FakeFs::new(cx.executor());
2322        let project = Project::test(fs, [], cx).await;
2323        let connection = Rc::new(FakeAgentConnection::new());
2324        let thread = cx
2325            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2326            .await
2327            .unwrap();
2328
2329        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2330
2331        // Send Output BEFORE Created - should be buffered by acp_thread
2332        thread.update(cx, |thread, cx| {
2333            thread.on_terminal_provider_event(
2334                TerminalProviderEvent::Output {
2335                    terminal_id: terminal_id.clone(),
2336                    data: b"hello buffered".to_vec(),
2337                },
2338                cx,
2339            );
2340        });
2341
2342        // Create a display-only terminal and then send Created
2343        let lower = cx.new(|cx| {
2344            let builder = ::terminal::TerminalBuilder::new_display_only(
2345                ::terminal::terminal_settings::CursorShape::default(),
2346                ::terminal::terminal_settings::AlternateScroll::On,
2347                None,
2348                0,
2349            )
2350            .unwrap();
2351            builder.subscribe(cx)
2352        });
2353
2354        thread.update(cx, |thread, cx| {
2355            thread.on_terminal_provider_event(
2356                TerminalProviderEvent::Created {
2357                    terminal_id: terminal_id.clone(),
2358                    label: "Buffered Test".to_string(),
2359                    cwd: None,
2360                    output_byte_limit: None,
2361                    terminal: lower.clone(),
2362                },
2363                cx,
2364            );
2365        });
2366
2367        // After Created, buffered Output should have been flushed into the renderer
2368        let content = thread.read_with(cx, |thread, cx| {
2369            let term = thread.terminal(terminal_id.clone()).unwrap();
2370            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2371        });
2372
2373        assert!(
2374            content.contains("hello buffered"),
2375            "expected buffered output to render, got: {content}"
2376        );
2377    }
2378
2379    #[gpui::test]
2380    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2381        init_test(cx);
2382
2383        let fs = FakeFs::new(cx.executor());
2384        let project = Project::test(fs, [], cx).await;
2385        let connection = Rc::new(FakeAgentConnection::new());
2386        let thread = cx
2387            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2388            .await
2389            .unwrap();
2390
2391        let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2392
2393        // Send Output BEFORE Created
2394        thread.update(cx, |thread, cx| {
2395            thread.on_terminal_provider_event(
2396                TerminalProviderEvent::Output {
2397                    terminal_id: terminal_id.clone(),
2398                    data: b"pre-exit data".to_vec(),
2399                },
2400                cx,
2401            );
2402        });
2403
2404        // Send Exit BEFORE Created
2405        thread.update(cx, |thread, cx| {
2406            thread.on_terminal_provider_event(
2407                TerminalProviderEvent::Exit {
2408                    terminal_id: terminal_id.clone(),
2409                    status: acp::TerminalExitStatus {
2410                        exit_code: Some(0),
2411                        signal: None,
2412                        meta: None,
2413                    },
2414                },
2415                cx,
2416            );
2417        });
2418
2419        // Now create a display-only lower-level terminal and send Created
2420        let lower = cx.new(|cx| {
2421            let builder = ::terminal::TerminalBuilder::new_display_only(
2422                ::terminal::terminal_settings::CursorShape::default(),
2423                ::terminal::terminal_settings::AlternateScroll::On,
2424                None,
2425                0,
2426            )
2427            .unwrap();
2428            builder.subscribe(cx)
2429        });
2430
2431        thread.update(cx, |thread, cx| {
2432            thread.on_terminal_provider_event(
2433                TerminalProviderEvent::Created {
2434                    terminal_id: terminal_id.clone(),
2435                    label: "Buffered Exit Test".to_string(),
2436                    cwd: None,
2437                    output_byte_limit: None,
2438                    terminal: lower.clone(),
2439                },
2440                cx,
2441            );
2442        });
2443
2444        // Output should be present after Created (flushed from buffer)
2445        let content = thread.read_with(cx, |thread, cx| {
2446            let term = thread.terminal(terminal_id.clone()).unwrap();
2447            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2448        });
2449
2450        assert!(
2451            content.contains("pre-exit data"),
2452            "expected pre-exit data to render, got: {content}"
2453        );
2454    }
2455
2456    #[gpui::test]
2457    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2458        init_test(cx);
2459
2460        let fs = FakeFs::new(cx.executor());
2461        let project = Project::test(fs, [], cx).await;
2462        let connection = Rc::new(FakeAgentConnection::new());
2463        let thread = cx
2464            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2465            .await
2466            .unwrap();
2467
2468        // Test creating a new user message
2469        thread.update(cx, |thread, cx| {
2470            thread.push_user_content_block(
2471                None,
2472                acp::ContentBlock::Text(acp::TextContent {
2473                    annotations: None,
2474                    text: "Hello, ".to_string(),
2475                    meta: None,
2476                }),
2477                cx,
2478            );
2479        });
2480
2481        thread.update(cx, |thread, cx| {
2482            assert_eq!(thread.entries.len(), 1);
2483            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2484                assert_eq!(user_msg.id, None);
2485                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2486            } else {
2487                panic!("Expected UserMessage");
2488            }
2489        });
2490
2491        // Test appending to existing user message
2492        let message_1_id = UserMessageId::new();
2493        thread.update(cx, |thread, cx| {
2494            thread.push_user_content_block(
2495                Some(message_1_id.clone()),
2496                acp::ContentBlock::Text(acp::TextContent {
2497                    annotations: None,
2498                    text: "world!".to_string(),
2499                    meta: None,
2500                }),
2501                cx,
2502            );
2503        });
2504
2505        thread.update(cx, |thread, cx| {
2506            assert_eq!(thread.entries.len(), 1);
2507            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2508                assert_eq!(user_msg.id, Some(message_1_id));
2509                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2510            } else {
2511                panic!("Expected UserMessage");
2512            }
2513        });
2514
2515        // Test creating new user message after assistant message
2516        thread.update(cx, |thread, cx| {
2517            thread.push_assistant_content_block(
2518                acp::ContentBlock::Text(acp::TextContent {
2519                    annotations: None,
2520                    text: "Assistant response".to_string(),
2521                    meta: None,
2522                }),
2523                false,
2524                cx,
2525            );
2526        });
2527
2528        let message_2_id = UserMessageId::new();
2529        thread.update(cx, |thread, cx| {
2530            thread.push_user_content_block(
2531                Some(message_2_id.clone()),
2532                acp::ContentBlock::Text(acp::TextContent {
2533                    annotations: None,
2534                    text: "New user message".to_string(),
2535                    meta: None,
2536                }),
2537                cx,
2538            );
2539        });
2540
2541        thread.update(cx, |thread, cx| {
2542            assert_eq!(thread.entries.len(), 3);
2543            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2544                assert_eq!(user_msg.id, Some(message_2_id));
2545                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2546            } else {
2547                panic!("Expected UserMessage at index 2");
2548            }
2549        });
2550    }
2551
2552    #[gpui::test]
2553    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2554        init_test(cx);
2555
2556        let fs = FakeFs::new(cx.executor());
2557        let project = Project::test(fs, [], cx).await;
2558        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2559            |_, thread, mut cx| {
2560                async move {
2561                    thread.update(&mut cx, |thread, cx| {
2562                        thread
2563                            .handle_session_update(
2564                                acp::SessionUpdate::AgentThoughtChunk {
2565                                    content: "Thinking ".into(),
2566                                },
2567                                cx,
2568                            )
2569                            .unwrap();
2570                        thread
2571                            .handle_session_update(
2572                                acp::SessionUpdate::AgentThoughtChunk {
2573                                    content: "hard!".into(),
2574                                },
2575                                cx,
2576                            )
2577                            .unwrap();
2578                    })?;
2579                    Ok(acp::PromptResponse {
2580                        stop_reason: acp::StopReason::EndTurn,
2581                        meta: None,
2582                    })
2583                }
2584                .boxed_local()
2585            },
2586        ));
2587
2588        let thread = cx
2589            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2590            .await
2591            .unwrap();
2592
2593        thread
2594            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2595            .await
2596            .unwrap();
2597
2598        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2599        assert_eq!(
2600            output,
2601            indoc! {r#"
2602            ## User
2603
2604            Hello from Zed!
2605
2606            ## Assistant
2607
2608            <thinking>
2609            Thinking hard!
2610            </thinking>
2611
2612            "#}
2613        );
2614    }
2615
2616    #[gpui::test]
2617    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2618        init_test(cx);
2619
2620        let fs = FakeFs::new(cx.executor());
2621        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2622            .await;
2623        let project = Project::test(fs.clone(), [], cx).await;
2624        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2625        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2626        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2627            move |_, thread, mut cx| {
2628                let read_file_tx = read_file_tx.clone();
2629                async move {
2630                    let content = thread
2631                        .update(&mut cx, |thread, cx| {
2632                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2633                        })
2634                        .unwrap()
2635                        .await
2636                        .unwrap();
2637                    assert_eq!(content, "one\ntwo\nthree\n");
2638                    read_file_tx.take().unwrap().send(()).unwrap();
2639                    thread
2640                        .update(&mut cx, |thread, cx| {
2641                            thread.write_text_file(
2642                                path!("/tmp/foo").into(),
2643                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2644                                cx,
2645                            )
2646                        })
2647                        .unwrap()
2648                        .await
2649                        .unwrap();
2650                    Ok(acp::PromptResponse {
2651                        stop_reason: acp::StopReason::EndTurn,
2652                        meta: None,
2653                    })
2654                }
2655                .boxed_local()
2656            },
2657        ));
2658
2659        let (worktree, pathbuf) = project
2660            .update(cx, |project, cx| {
2661                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2662            })
2663            .await
2664            .unwrap();
2665        let buffer = project
2666            .update(cx, |project, cx| {
2667                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2668            })
2669            .await
2670            .unwrap();
2671
2672        let thread = cx
2673            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2674            .await
2675            .unwrap();
2676
2677        let request = thread.update(cx, |thread, cx| {
2678            thread.send_raw("Extend the count in /tmp/foo", cx)
2679        });
2680        read_file_rx.await.ok();
2681        buffer.update(cx, |buffer, cx| {
2682            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2683        });
2684        cx.run_until_parked();
2685        assert_eq!(
2686            buffer.read_with(cx, |buffer, _| buffer.text()),
2687            "zero\none\ntwo\nthree\nfour\nfive\n"
2688        );
2689        assert_eq!(
2690            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2691            "zero\none\ntwo\nthree\nfour\nfive\n"
2692        );
2693        request.await.unwrap();
2694    }
2695
2696    #[gpui::test]
2697    async fn test_reading_from_line(cx: &mut TestAppContext) {
2698        init_test(cx);
2699
2700        let fs = FakeFs::new(cx.executor());
2701        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2702            .await;
2703        let project = Project::test(fs.clone(), [], cx).await;
2704        project
2705            .update(cx, |project, cx| {
2706                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2707            })
2708            .await
2709            .unwrap();
2710
2711        let connection = Rc::new(FakeAgentConnection::new());
2712
2713        let thread = cx
2714            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2715            .await
2716            .unwrap();
2717
2718        // Whole file
2719        let content = thread
2720            .update(cx, |thread, cx| {
2721                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2722            })
2723            .await
2724            .unwrap();
2725
2726        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2727
2728        // Only start line
2729        let content = thread
2730            .update(cx, |thread, cx| {
2731                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2732            })
2733            .await
2734            .unwrap();
2735
2736        assert_eq!(content, "three\nfour\n");
2737
2738        // Only limit
2739        let content = thread
2740            .update(cx, |thread, cx| {
2741                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2742            })
2743            .await
2744            .unwrap();
2745
2746        assert_eq!(content, "one\ntwo\n");
2747
2748        // Range
2749        let content = thread
2750            .update(cx, |thread, cx| {
2751                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2752            })
2753            .await
2754            .unwrap();
2755
2756        assert_eq!(content, "two\nthree\n");
2757
2758        // Invalid
2759        let err = thread
2760            .update(cx, |thread, cx| {
2761                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2762            })
2763            .await
2764            .unwrap_err();
2765
2766        assert_eq!(
2767            err.to_string(),
2768            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2769        );
2770    }
2771
2772    #[gpui::test]
2773    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2774        init_test(cx);
2775
2776        let fs = FakeFs::new(cx.executor());
2777        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2778        let project = Project::test(fs.clone(), [], cx).await;
2779        project
2780            .update(cx, |project, cx| {
2781                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2782            })
2783            .await
2784            .unwrap();
2785
2786        let connection = Rc::new(FakeAgentConnection::new());
2787
2788        let thread = cx
2789            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2790            .await
2791            .unwrap();
2792
2793        // Whole file
2794        let content = thread
2795            .update(cx, |thread, cx| {
2796                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2797            })
2798            .await
2799            .unwrap();
2800
2801        assert_eq!(content, "");
2802
2803        // Only start line
2804        let content = thread
2805            .update(cx, |thread, cx| {
2806                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2807            })
2808            .await
2809            .unwrap();
2810
2811        assert_eq!(content, "");
2812
2813        // Only limit
2814        let content = thread
2815            .update(cx, |thread, cx| {
2816                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2817            })
2818            .await
2819            .unwrap();
2820
2821        assert_eq!(content, "");
2822
2823        // Range
2824        let content = thread
2825            .update(cx, |thread, cx| {
2826                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2827            })
2828            .await
2829            .unwrap();
2830
2831        assert_eq!(content, "");
2832
2833        // Invalid
2834        let err = thread
2835            .update(cx, |thread, cx| {
2836                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2837            })
2838            .await
2839            .unwrap_err();
2840
2841        assert_eq!(
2842            err.to_string(),
2843            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2844        );
2845    }
2846    #[gpui::test]
2847    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2848        init_test(cx);
2849
2850        let fs = FakeFs::new(cx.executor());
2851        fs.insert_tree(path!("/tmp"), json!({})).await;
2852        let project = Project::test(fs.clone(), [], cx).await;
2853        project
2854            .update(cx, |project, cx| {
2855                project.find_or_create_worktree(path!("/tmp"), true, cx)
2856            })
2857            .await
2858            .unwrap();
2859
2860        let connection = Rc::new(FakeAgentConnection::new());
2861
2862        let thread = cx
2863            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2864            .await
2865            .unwrap();
2866
2867        // Out of project file
2868        let err = thread
2869            .update(cx, |thread, cx| {
2870                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2871            })
2872            .await
2873            .unwrap_err();
2874
2875        assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2876    }
2877
2878    #[gpui::test]
2879    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2880        init_test(cx);
2881
2882        let fs = FakeFs::new(cx.executor());
2883        let project = Project::test(fs, [], cx).await;
2884        let id = acp::ToolCallId("test".into());
2885
2886        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2887            let id = id.clone();
2888            move |_, thread, mut cx| {
2889                let id = id.clone();
2890                async move {
2891                    thread
2892                        .update(&mut cx, |thread, cx| {
2893                            thread.handle_session_update(
2894                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2895                                    id: id.clone(),
2896                                    title: "Label".into(),
2897                                    kind: acp::ToolKind::Fetch,
2898                                    status: acp::ToolCallStatus::InProgress,
2899                                    content: vec![],
2900                                    locations: vec![],
2901                                    raw_input: None,
2902                                    raw_output: None,
2903                                    meta: None,
2904                                }),
2905                                cx,
2906                            )
2907                        })
2908                        .unwrap()
2909                        .unwrap();
2910                    Ok(acp::PromptResponse {
2911                        stop_reason: acp::StopReason::EndTurn,
2912                        meta: None,
2913                    })
2914                }
2915                .boxed_local()
2916            }
2917        }));
2918
2919        let thread = cx
2920            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2921            .await
2922            .unwrap();
2923
2924        let request = thread.update(cx, |thread, cx| {
2925            thread.send_raw("Fetch https://example.com", cx)
2926        });
2927
2928        run_until_first_tool_call(&thread, cx).await;
2929
2930        thread.read_with(cx, |thread, _| {
2931            assert!(matches!(
2932                thread.entries[1],
2933                AgentThreadEntry::ToolCall(ToolCall {
2934                    status: ToolCallStatus::InProgress,
2935                    ..
2936                })
2937            ));
2938        });
2939
2940        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2941
2942        thread.read_with(cx, |thread, _| {
2943            assert!(matches!(
2944                &thread.entries[1],
2945                AgentThreadEntry::ToolCall(ToolCall {
2946                    status: ToolCallStatus::Canceled,
2947                    ..
2948                })
2949            ));
2950        });
2951
2952        thread
2953            .update(cx, |thread, cx| {
2954                thread.handle_session_update(
2955                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2956                        id,
2957                        fields: acp::ToolCallUpdateFields {
2958                            status: Some(acp::ToolCallStatus::Completed),
2959                            ..Default::default()
2960                        },
2961                        meta: None,
2962                    }),
2963                    cx,
2964                )
2965            })
2966            .unwrap();
2967
2968        request.await.unwrap();
2969
2970        thread.read_with(cx, |thread, _| {
2971            assert!(matches!(
2972                thread.entries[1],
2973                AgentThreadEntry::ToolCall(ToolCall {
2974                    status: ToolCallStatus::Completed,
2975                    ..
2976                })
2977            ));
2978        });
2979    }
2980
2981    #[gpui::test]
2982    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2983        init_test(cx);
2984        let fs = FakeFs::new(cx.background_executor.clone());
2985        fs.insert_tree(path!("/test"), json!({})).await;
2986        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2987
2988        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2989            move |_, thread, mut cx| {
2990                async move {
2991                    thread
2992                        .update(&mut cx, |thread, cx| {
2993                            thread.handle_session_update(
2994                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2995                                    id: acp::ToolCallId("test".into()),
2996                                    title: "Label".into(),
2997                                    kind: acp::ToolKind::Edit,
2998                                    status: acp::ToolCallStatus::Completed,
2999                                    content: vec![acp::ToolCallContent::Diff {
3000                                        diff: acp::Diff {
3001                                            path: "/test/test.txt".into(),
3002                                            old_text: None,
3003                                            new_text: "foo".into(),
3004                                            meta: None,
3005                                        },
3006                                    }],
3007                                    locations: vec![],
3008                                    raw_input: None,
3009                                    raw_output: None,
3010                                    meta: None,
3011                                }),
3012                                cx,
3013                            )
3014                        })
3015                        .unwrap()
3016                        .unwrap();
3017                    Ok(acp::PromptResponse {
3018                        stop_reason: acp::StopReason::EndTurn,
3019                        meta: None,
3020                    })
3021                }
3022                .boxed_local()
3023            }
3024        }));
3025
3026        let thread = cx
3027            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3028            .await
3029            .unwrap();
3030
3031        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3032            .await
3033            .unwrap();
3034
3035        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3036    }
3037
3038    #[gpui::test(iterations = 10)]
3039    async fn test_checkpoints(cx: &mut TestAppContext) {
3040        init_test(cx);
3041        let fs = FakeFs::new(cx.background_executor.clone());
3042        fs.insert_tree(
3043            path!("/test"),
3044            json!({
3045                ".git": {}
3046            }),
3047        )
3048        .await;
3049        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3050
3051        let simulate_changes = Arc::new(AtomicBool::new(true));
3052        let next_filename = Arc::new(AtomicUsize::new(0));
3053        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3054            let simulate_changes = simulate_changes.clone();
3055            let next_filename = next_filename.clone();
3056            let fs = fs.clone();
3057            move |request, thread, mut cx| {
3058                let fs = fs.clone();
3059                let simulate_changes = simulate_changes.clone();
3060                let next_filename = next_filename.clone();
3061                async move {
3062                    if simulate_changes.load(SeqCst) {
3063                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3064                        fs.write(Path::new(&filename), b"").await?;
3065                    }
3066
3067                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3068                        panic!("expected text content block");
3069                    };
3070                    thread.update(&mut cx, |thread, cx| {
3071                        thread
3072                            .handle_session_update(
3073                                acp::SessionUpdate::AgentMessageChunk {
3074                                    content: content.text.to_uppercase().into(),
3075                                },
3076                                cx,
3077                            )
3078                            .unwrap();
3079                    })?;
3080                    Ok(acp::PromptResponse {
3081                        stop_reason: acp::StopReason::EndTurn,
3082                        meta: None,
3083                    })
3084                }
3085                .boxed_local()
3086            }
3087        }));
3088        let thread = cx
3089            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3090            .await
3091            .unwrap();
3092
3093        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3094            .await
3095            .unwrap();
3096        thread.read_with(cx, |thread, cx| {
3097            assert_eq!(
3098                thread.to_markdown(cx),
3099                indoc! {"
3100                    ## User (checkpoint)
3101
3102                    Lorem
3103
3104                    ## Assistant
3105
3106                    LOREM
3107
3108                "}
3109            );
3110        });
3111        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3112
3113        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3114            .await
3115            .unwrap();
3116        thread.read_with(cx, |thread, cx| {
3117            assert_eq!(
3118                thread.to_markdown(cx),
3119                indoc! {"
3120                    ## User (checkpoint)
3121
3122                    Lorem
3123
3124                    ## Assistant
3125
3126                    LOREM
3127
3128                    ## User (checkpoint)
3129
3130                    ipsum
3131
3132                    ## Assistant
3133
3134                    IPSUM
3135
3136                "}
3137            );
3138        });
3139        assert_eq!(
3140            fs.files(),
3141            vec![
3142                Path::new(path!("/test/file-0")),
3143                Path::new(path!("/test/file-1"))
3144            ]
3145        );
3146
3147        // Checkpoint isn't stored when there are no changes.
3148        simulate_changes.store(false, SeqCst);
3149        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3150            .await
3151            .unwrap();
3152        thread.read_with(cx, |thread, cx| {
3153            assert_eq!(
3154                thread.to_markdown(cx),
3155                indoc! {"
3156                    ## User (checkpoint)
3157
3158                    Lorem
3159
3160                    ## Assistant
3161
3162                    LOREM
3163
3164                    ## User (checkpoint)
3165
3166                    ipsum
3167
3168                    ## Assistant
3169
3170                    IPSUM
3171
3172                    ## User
3173
3174                    dolor
3175
3176                    ## Assistant
3177
3178                    DOLOR
3179
3180                "}
3181            );
3182        });
3183        assert_eq!(
3184            fs.files(),
3185            vec![
3186                Path::new(path!("/test/file-0")),
3187                Path::new(path!("/test/file-1"))
3188            ]
3189        );
3190
3191        // Rewinding the conversation truncates the history and restores the checkpoint.
3192        thread
3193            .update(cx, |thread, cx| {
3194                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3195                    panic!("unexpected entries {:?}", thread.entries)
3196                };
3197                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3198            })
3199            .await
3200            .unwrap();
3201        thread.read_with(cx, |thread, cx| {
3202            assert_eq!(
3203                thread.to_markdown(cx),
3204                indoc! {"
3205                    ## User (checkpoint)
3206
3207                    Lorem
3208
3209                    ## Assistant
3210
3211                    LOREM
3212
3213                "}
3214            );
3215        });
3216        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3217    }
3218
3219    #[gpui::test]
3220    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3221        use std::sync::atomic::AtomicUsize;
3222        init_test(cx);
3223
3224        let fs = FakeFs::new(cx.executor());
3225        let project = Project::test(fs, None, cx).await;
3226
3227        // Create a connection that simulates refusal after tool result
3228        let prompt_count = Arc::new(AtomicUsize::new(0));
3229        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3230            let prompt_count = prompt_count.clone();
3231            move |_request, thread, mut cx| {
3232                let count = prompt_count.fetch_add(1, SeqCst);
3233                async move {
3234                    if count == 0 {
3235                        // First prompt: Generate a tool call with result
3236                        thread.update(&mut cx, |thread, cx| {
3237                            thread
3238                                .handle_session_update(
3239                                    acp::SessionUpdate::ToolCall(acp::ToolCall {
3240                                        id: acp::ToolCallId("tool1".into()),
3241                                        title: "Test Tool".into(),
3242                                        kind: acp::ToolKind::Fetch,
3243                                        status: acp::ToolCallStatus::Completed,
3244                                        content: vec![],
3245                                        locations: vec![],
3246                                        raw_input: Some(serde_json::json!({"query": "test"})),
3247                                        raw_output: Some(
3248                                            serde_json::json!({"result": "inappropriate content"}),
3249                                        ),
3250                                        meta: None,
3251                                    }),
3252                                    cx,
3253                                )
3254                                .unwrap();
3255                        })?;
3256
3257                        // Now return refusal because of the tool result
3258                        Ok(acp::PromptResponse {
3259                            stop_reason: acp::StopReason::Refusal,
3260                            meta: None,
3261                        })
3262                    } else {
3263                        Ok(acp::PromptResponse {
3264                            stop_reason: acp::StopReason::EndTurn,
3265                            meta: None,
3266                        })
3267                    }
3268                }
3269                .boxed_local()
3270            }
3271        }));
3272
3273        let thread = cx
3274            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3275            .await
3276            .unwrap();
3277
3278        // Track if we see a Refusal event
3279        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3280        let saw_refusal_event_captured = saw_refusal_event.clone();
3281        thread.update(cx, |_thread, cx| {
3282            cx.subscribe(
3283                &thread,
3284                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3285                    if matches!(event, AcpThreadEvent::Refusal) {
3286                        *saw_refusal_event_captured.lock().unwrap() = true;
3287                    }
3288                },
3289            )
3290            .detach();
3291        });
3292
3293        // Send a user message - this will trigger tool call and then refusal
3294        let send_task = thread.update(cx, |thread, cx| {
3295            thread.send(
3296                vec![acp::ContentBlock::Text(acp::TextContent {
3297                    text: "Hello".into(),
3298                    annotations: None,
3299                    meta: None,
3300                })],
3301                cx,
3302            )
3303        });
3304        cx.background_executor.spawn(send_task).detach();
3305        cx.run_until_parked();
3306
3307        // Verify that:
3308        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3309        // 2. The user message was NOT truncated
3310        assert!(
3311            *saw_refusal_event.lock().unwrap(),
3312            "Refusal event should be emitted for tool result refusals"
3313        );
3314
3315        thread.read_with(cx, |thread, _| {
3316            let entries = thread.entries();
3317            assert!(entries.len() >= 2, "Should have user message and tool call");
3318
3319            // Verify user message is still there
3320            assert!(
3321                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3322                "User message should not be truncated"
3323            );
3324
3325            // Verify tool call is there with result
3326            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3327                assert!(
3328                    tool_call.raw_output.is_some(),
3329                    "Tool call should have output"
3330                );
3331            } else {
3332                panic!("Expected tool call at index 1");
3333            }
3334        });
3335    }
3336
3337    #[gpui::test]
3338    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3339        init_test(cx);
3340
3341        let fs = FakeFs::new(cx.executor());
3342        let project = Project::test(fs, None, cx).await;
3343
3344        let refuse_next = Arc::new(AtomicBool::new(false));
3345        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3346            let refuse_next = refuse_next.clone();
3347            move |_request, _thread, _cx| {
3348                if refuse_next.load(SeqCst) {
3349                    async move {
3350                        Ok(acp::PromptResponse {
3351                            stop_reason: acp::StopReason::Refusal,
3352                            meta: None,
3353                        })
3354                    }
3355                    .boxed_local()
3356                } else {
3357                    async move {
3358                        Ok(acp::PromptResponse {
3359                            stop_reason: acp::StopReason::EndTurn,
3360                            meta: None,
3361                        })
3362                    }
3363                    .boxed_local()
3364                }
3365            }
3366        }));
3367
3368        let thread = cx
3369            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3370            .await
3371            .unwrap();
3372
3373        // Track if we see a Refusal event
3374        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3375        let saw_refusal_event_captured = saw_refusal_event.clone();
3376        thread.update(cx, |_thread, cx| {
3377            cx.subscribe(
3378                &thread,
3379                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3380                    if matches!(event, AcpThreadEvent::Refusal) {
3381                        *saw_refusal_event_captured.lock().unwrap() = true;
3382                    }
3383                },
3384            )
3385            .detach();
3386        });
3387
3388        // Send a message that will be refused
3389        refuse_next.store(true, SeqCst);
3390        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3391            .await
3392            .unwrap();
3393
3394        // Verify that a Refusal event WAS emitted for user prompt refusal
3395        assert!(
3396            *saw_refusal_event.lock().unwrap(),
3397            "Refusal event should be emitted for user prompt refusals"
3398        );
3399
3400        // Verify the message was truncated (user prompt refusal)
3401        thread.read_with(cx, |thread, cx| {
3402            assert_eq!(thread.to_markdown(cx), "");
3403        });
3404    }
3405
3406    #[gpui::test]
3407    async fn test_refusal(cx: &mut TestAppContext) {
3408        init_test(cx);
3409        let fs = FakeFs::new(cx.background_executor.clone());
3410        fs.insert_tree(path!("/"), json!({})).await;
3411        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3412
3413        let refuse_next = Arc::new(AtomicBool::new(false));
3414        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3415            let refuse_next = refuse_next.clone();
3416            move |request, thread, mut cx| {
3417                let refuse_next = refuse_next.clone();
3418                async move {
3419                    if refuse_next.load(SeqCst) {
3420                        return Ok(acp::PromptResponse {
3421                            stop_reason: acp::StopReason::Refusal,
3422                            meta: None,
3423                        });
3424                    }
3425
3426                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3427                        panic!("expected text content block");
3428                    };
3429                    thread.update(&mut cx, |thread, cx| {
3430                        thread
3431                            .handle_session_update(
3432                                acp::SessionUpdate::AgentMessageChunk {
3433                                    content: content.text.to_uppercase().into(),
3434                                },
3435                                cx,
3436                            )
3437                            .unwrap();
3438                    })?;
3439                    Ok(acp::PromptResponse {
3440                        stop_reason: acp::StopReason::EndTurn,
3441                        meta: None,
3442                    })
3443                }
3444                .boxed_local()
3445            }
3446        }));
3447        let thread = cx
3448            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3449            .await
3450            .unwrap();
3451
3452        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3453            .await
3454            .unwrap();
3455        thread.read_with(cx, |thread, cx| {
3456            assert_eq!(
3457                thread.to_markdown(cx),
3458                indoc! {"
3459                    ## User
3460
3461                    hello
3462
3463                    ## Assistant
3464
3465                    HELLO
3466
3467                "}
3468            );
3469        });
3470
3471        // Simulate refusing the second message. The message should be truncated
3472        // when a user prompt is refused.
3473        refuse_next.store(true, SeqCst);
3474        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3475            .await
3476            .unwrap();
3477        thread.read_with(cx, |thread, cx| {
3478            assert_eq!(
3479                thread.to_markdown(cx),
3480                indoc! {"
3481                    ## User
3482
3483                    hello
3484
3485                    ## Assistant
3486
3487                    HELLO
3488
3489                "}
3490            );
3491        });
3492    }
3493
3494    async fn run_until_first_tool_call(
3495        thread: &Entity<AcpThread>,
3496        cx: &mut TestAppContext,
3497    ) -> usize {
3498        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3499
3500        let subscription = cx.update(|cx| {
3501            cx.subscribe(thread, move |thread, _, cx| {
3502                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3503                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3504                        return tx.try_send(ix).unwrap();
3505                    }
3506                }
3507            })
3508        });
3509
3510        select! {
3511            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3512                panic!("Timeout waiting for tool call")
3513            }
3514            ix = rx.next().fuse() => {
3515                drop(subscription);
3516                ix.unwrap()
3517            }
3518        }
3519    }
3520
3521    #[derive(Clone, Default)]
3522    struct FakeAgentConnection {
3523        auth_methods: Vec<acp::AuthMethod>,
3524        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3525        on_user_message: Option<
3526            Rc<
3527                dyn Fn(
3528                        acp::PromptRequest,
3529                        WeakEntity<AcpThread>,
3530                        AsyncApp,
3531                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3532                    + 'static,
3533            >,
3534        >,
3535    }
3536
3537    impl FakeAgentConnection {
3538        fn new() -> Self {
3539            Self {
3540                auth_methods: Vec::new(),
3541                on_user_message: None,
3542                sessions: Arc::default(),
3543            }
3544        }
3545
3546        #[expect(unused)]
3547        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3548            self.auth_methods = auth_methods;
3549            self
3550        }
3551
3552        fn on_user_message(
3553            mut self,
3554            handler: impl Fn(
3555                acp::PromptRequest,
3556                WeakEntity<AcpThread>,
3557                AsyncApp,
3558            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3559            + 'static,
3560        ) -> Self {
3561            self.on_user_message.replace(Rc::new(handler));
3562            self
3563        }
3564    }
3565
3566    impl AgentConnection for FakeAgentConnection {
3567        fn auth_methods(&self) -> &[acp::AuthMethod] {
3568            &self.auth_methods
3569        }
3570
3571        fn new_thread(
3572            self: Rc<Self>,
3573            project: Entity<Project>,
3574            _cwd: &Path,
3575            cx: &mut App,
3576        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3577            let session_id = acp::SessionId(
3578                rand::rng()
3579                    .sample_iter(&distr::Alphanumeric)
3580                    .take(7)
3581                    .map(char::from)
3582                    .collect::<String>()
3583                    .into(),
3584            );
3585            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3586            let thread = cx.new(|cx| {
3587                AcpThread::new(
3588                    "Test",
3589                    self.clone(),
3590                    project,
3591                    action_log,
3592                    session_id.clone(),
3593                    watch::Receiver::constant(acp::PromptCapabilities {
3594                        image: true,
3595                        audio: true,
3596                        embedded_context: true,
3597                        meta: None,
3598                    }),
3599                    cx,
3600                )
3601            });
3602            self.sessions.lock().insert(session_id, thread.downgrade());
3603            Task::ready(Ok(thread))
3604        }
3605
3606        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3607            if self.auth_methods().iter().any(|m| m.id == method) {
3608                Task::ready(Ok(()))
3609            } else {
3610                Task::ready(Err(anyhow!("Invalid Auth Method")))
3611            }
3612        }
3613
3614        fn prompt(
3615            &self,
3616            _id: Option<UserMessageId>,
3617            params: acp::PromptRequest,
3618            cx: &mut App,
3619        ) -> Task<gpui::Result<acp::PromptResponse>> {
3620            let sessions = self.sessions.lock();
3621            let thread = sessions.get(¶ms.session_id).unwrap();
3622            if let Some(handler) = &self.on_user_message {
3623                let handler = handler.clone();
3624                let thread = thread.clone();
3625                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3626            } else {
3627                Task::ready(Ok(acp::PromptResponse {
3628                    stop_reason: acp::StopReason::EndTurn,
3629                    meta: None,
3630                }))
3631            }
3632        }
3633
3634        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3635            let sessions = self.sessions.lock();
3636            let thread = sessions.get(session_id).unwrap().clone();
3637
3638            cx.spawn(async move |cx| {
3639                thread
3640                    .update(cx, |thread, cx| thread.cancel(cx))
3641                    .unwrap()
3642                    .await
3643            })
3644            .detach();
3645        }
3646
3647        fn truncate(
3648            &self,
3649            session_id: &acp::SessionId,
3650            _cx: &App,
3651        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3652            Some(Rc::new(FakeAgentSessionEditor {
3653                _session_id: session_id.clone(),
3654            }))
3655        }
3656
3657        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3658            self
3659        }
3660    }
3661
3662    struct FakeAgentSessionEditor {
3663        _session_id: acp::SessionId,
3664    }
3665
3666    impl AgentSessionTruncate for FakeAgentSessionEditor {
3667        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3668            Task::ready(Ok(()))
3669        }
3670    }
3671
3672    #[gpui::test]
3673    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3674        init_test(cx);
3675
3676        let fs = FakeFs::new(cx.executor());
3677        let project = Project::test(fs, [], cx).await;
3678        let connection = Rc::new(FakeAgentConnection::new());
3679        let thread = cx
3680            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3681            .await
3682            .unwrap();
3683
3684        // Try to update a tool call that doesn't exist
3685        let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3686        thread.update(cx, |thread, cx| {
3687            let result = thread.handle_session_update(
3688                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3689                    id: nonexistent_id.clone(),
3690                    fields: acp::ToolCallUpdateFields {
3691                        status: Some(acp::ToolCallStatus::Completed),
3692                        ..Default::default()
3693                    },
3694                    meta: None,
3695                }),
3696                cx,
3697            );
3698
3699            // The update should succeed (not return an error)
3700            assert!(result.is_ok());
3701
3702            // There should now be exactly one entry in the thread
3703            assert_eq!(thread.entries.len(), 1);
3704
3705            // The entry should be a failed tool call
3706            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3707                assert_eq!(tool_call.id, nonexistent_id);
3708                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3709                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3710
3711                // Check that the content contains the error message
3712                assert_eq!(tool_call.content.len(), 1);
3713                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3714                    match content_block {
3715                        ContentBlock::Markdown { markdown } => {
3716                            let markdown_text = markdown.read(cx).source();
3717                            assert!(markdown_text.contains("Tool call not found"));
3718                        }
3719                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3720                        ContentBlock::ResourceLink { .. } => {
3721                            panic!("Expected markdown content, got resource link")
3722                        }
3723                    }
3724                } else {
3725                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3726                }
3727            } else {
3728                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3729            }
3730        });
3731    }
3732}