acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use agent_settings::AgentSettings;
   7use collections::HashSet;
   8pub use connection::*;
   9pub use diff::*;
  10use language::language_settings::FormatOnSave;
  11pub use mention::*;
  12use project::lsp_store::{FormatTrigger, LspFormatTarget};
  13use serde::{Deserialize, Serialize};
  14use settings::Settings as _;
  15use task::{Shell, ShellBuilder};
  16pub use terminal::*;
  17
  18use action_log::{ActionLog, ActionLogTelemetry};
  19use agent_client_protocol::{self as acp};
  20use anyhow::{Context as _, Result, anyhow};
  21use editor::Bias;
  22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  24use itertools::Itertools;
  25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  26use markdown::Markdown;
  27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  28use std::collections::HashMap;
  29use std::error::Error;
  30use std::fmt::{Formatter, Write};
  31use std::ops::Range;
  32use std::process::ExitStatus;
  33use std::rc::Rc;
  34use std::time::{Duration, Instant};
  35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  36use ui::App;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40#[derive(Debug)]
  41pub struct UserMessage {
  42    pub id: Option<UserMessageId>,
  43    pub content: ContentBlock,
  44    pub chunks: Vec<acp::ContentBlock>,
  45    pub checkpoint: Option<Checkpoint>,
  46    pub indented: bool,
  47}
  48
  49#[derive(Debug)]
  50pub struct Checkpoint {
  51    git_checkpoint: GitStoreCheckpoint,
  52    pub show: bool,
  53}
  54
  55impl UserMessage {
  56    fn to_markdown(&self, cx: &App) -> String {
  57        let mut markdown = String::new();
  58        if self
  59            .checkpoint
  60            .as_ref()
  61            .is_some_and(|checkpoint| checkpoint.show)
  62        {
  63            writeln!(markdown, "## User (checkpoint)").unwrap();
  64        } else {
  65            writeln!(markdown, "## User").unwrap();
  66        }
  67        writeln!(markdown).unwrap();
  68        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  69        writeln!(markdown).unwrap();
  70        markdown
  71    }
  72}
  73
  74#[derive(Debug, PartialEq)]
  75pub struct AssistantMessage {
  76    pub chunks: Vec<AssistantMessageChunk>,
  77    pub indented: bool,
  78}
  79
  80impl AssistantMessage {
  81    pub fn to_markdown(&self, cx: &App) -> String {
  82        format!(
  83            "## Assistant\n\n{}\n\n",
  84            self.chunks
  85                .iter()
  86                .map(|chunk| chunk.to_markdown(cx))
  87                .join("\n\n")
  88        )
  89    }
  90}
  91
  92#[derive(Debug, PartialEq)]
  93pub enum AssistantMessageChunk {
  94    Message { block: ContentBlock },
  95    Thought { block: ContentBlock },
  96}
  97
  98impl AssistantMessageChunk {
  99    pub fn from_str(
 100        chunk: &str,
 101        language_registry: &Arc<LanguageRegistry>,
 102        path_style: PathStyle,
 103        cx: &mut App,
 104    ) -> Self {
 105        Self::Message {
 106            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 107        }
 108    }
 109
 110    fn to_markdown(&self, cx: &App) -> String {
 111        match self {
 112            Self::Message { block } => block.to_markdown(cx).to_string(),
 113            Self::Thought { block } => {
 114                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 115            }
 116        }
 117    }
 118}
 119
 120#[derive(Debug)]
 121pub enum AgentThreadEntry {
 122    UserMessage(UserMessage),
 123    AssistantMessage(AssistantMessage),
 124    ToolCall(ToolCall),
 125}
 126
 127impl AgentThreadEntry {
 128    pub fn is_indented(&self) -> bool {
 129        match self {
 130            Self::UserMessage(message) => message.indented,
 131            Self::AssistantMessage(message) => message.indented,
 132            Self::ToolCall(_) => false,
 133        }
 134    }
 135
 136    pub fn to_markdown(&self, cx: &App) -> String {
 137        match self {
 138            Self::UserMessage(message) => message.to_markdown(cx),
 139            Self::AssistantMessage(message) => message.to_markdown(cx),
 140            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 141        }
 142    }
 143
 144    pub fn user_message(&self) -> Option<&UserMessage> {
 145        if let AgentThreadEntry::UserMessage(message) = self {
 146            Some(message)
 147        } else {
 148            None
 149        }
 150    }
 151
 152    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 153        if let AgentThreadEntry::ToolCall(call) = self {
 154            itertools::Either::Left(call.diffs())
 155        } else {
 156            itertools::Either::Right(std::iter::empty())
 157        }
 158    }
 159
 160    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 161        if let AgentThreadEntry::ToolCall(call) = self {
 162            itertools::Either::Left(call.terminals())
 163        } else {
 164            itertools::Either::Right(std::iter::empty())
 165        }
 166    }
 167
 168    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 169        if let AgentThreadEntry::ToolCall(ToolCall {
 170            locations,
 171            resolved_locations,
 172            ..
 173        }) = self
 174        {
 175            Some((
 176                locations.get(ix)?.clone(),
 177                resolved_locations.get(ix)?.clone()?,
 178            ))
 179        } else {
 180            None
 181        }
 182    }
 183}
 184
 185#[derive(Debug)]
 186pub struct ToolCall {
 187    pub id: acp::ToolCallId,
 188    pub label: Entity<Markdown>,
 189    pub kind: acp::ToolKind,
 190    pub content: Vec<ToolCallContent>,
 191    pub status: ToolCallStatus,
 192    pub locations: Vec<acp::ToolCallLocation>,
 193    pub resolved_locations: Vec<Option<AgentLocation>>,
 194    pub raw_input: Option<serde_json::Value>,
 195    pub raw_input_markdown: Option<Entity<Markdown>>,
 196    pub raw_output: Option<serde_json::Value>,
 197}
 198
 199impl ToolCall {
 200    fn from_acp(
 201        tool_call: acp::ToolCall,
 202        status: ToolCallStatus,
 203        language_registry: Arc<LanguageRegistry>,
 204        path_style: PathStyle,
 205        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 206        cx: &mut App,
 207    ) -> Result<Self> {
 208        let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 209            first_line.to_owned() + ""
 210        } else {
 211            tool_call.title
 212        };
 213        let mut content = Vec::with_capacity(tool_call.content.len());
 214        for item in tool_call.content {
 215            if let Some(item) = ToolCallContent::from_acp(
 216                item,
 217                language_registry.clone(),
 218                path_style,
 219                terminals,
 220                cx,
 221            )? {
 222                content.push(item);
 223            }
 224        }
 225
 226        let raw_input_markdown = tool_call
 227            .raw_input
 228            .as_ref()
 229            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 230
 231        let result = Self {
 232            id: tool_call.tool_call_id,
 233            label: cx
 234                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 235            kind: tool_call.kind,
 236            content,
 237            locations: tool_call.locations,
 238            resolved_locations: Vec::default(),
 239            status,
 240            raw_input: tool_call.raw_input,
 241            raw_input_markdown,
 242            raw_output: tool_call.raw_output,
 243        };
 244        Ok(result)
 245    }
 246
 247    fn update_fields(
 248        &mut self,
 249        fields: acp::ToolCallUpdateFields,
 250        language_registry: Arc<LanguageRegistry>,
 251        path_style: PathStyle,
 252        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 253        cx: &mut App,
 254    ) -> Result<()> {
 255        let acp::ToolCallUpdateFields {
 256            kind,
 257            status,
 258            title,
 259            content,
 260            locations,
 261            raw_input,
 262            raw_output,
 263            ..
 264        } = fields;
 265
 266        if let Some(kind) = kind {
 267            self.kind = kind;
 268        }
 269
 270        if let Some(status) = status {
 271            self.status = status.into();
 272        }
 273
 274        if let Some(title) = title {
 275            self.label.update(cx, |label, cx| {
 276                if let Some((first_line, _)) = title.split_once("\n") {
 277                    label.replace(first_line.to_owned() + "", cx)
 278                } else {
 279                    label.replace(title, cx);
 280                }
 281            });
 282        }
 283
 284        if let Some(content) = content {
 285            let mut new_content_len = content.len();
 286            let mut content = content.into_iter();
 287
 288            // Reuse existing content if we can
 289            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 290                let valid_content =
 291                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 292                if !valid_content {
 293                    new_content_len -= 1;
 294                }
 295            }
 296            for new in content {
 297                if let Some(new) = ToolCallContent::from_acp(
 298                    new,
 299                    language_registry.clone(),
 300                    path_style,
 301                    terminals,
 302                    cx,
 303                )? {
 304                    self.content.push(new);
 305                } else {
 306                    new_content_len -= 1;
 307                }
 308            }
 309            self.content.truncate(new_content_len);
 310        }
 311
 312        if let Some(locations) = locations {
 313            self.locations = locations;
 314        }
 315
 316        if let Some(raw_input) = raw_input {
 317            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 318            self.raw_input = Some(raw_input);
 319        }
 320
 321        if let Some(raw_output) = raw_output {
 322            if self.content.is_empty()
 323                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 324            {
 325                self.content
 326                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 327                        markdown,
 328                    }));
 329            }
 330            self.raw_output = Some(raw_output);
 331        }
 332        Ok(())
 333    }
 334
 335    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 336        self.content.iter().filter_map(|content| match content {
 337            ToolCallContent::Diff(diff) => Some(diff),
 338            ToolCallContent::ContentBlock(_) => None,
 339            ToolCallContent::Terminal(_) => None,
 340        })
 341    }
 342
 343    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 344        self.content.iter().filter_map(|content| match content {
 345            ToolCallContent::Terminal(terminal) => Some(terminal),
 346            ToolCallContent::ContentBlock(_) => None,
 347            ToolCallContent::Diff(_) => None,
 348        })
 349    }
 350
 351    fn to_markdown(&self, cx: &App) -> String {
 352        let mut markdown = format!(
 353            "**Tool Call: {}**\nStatus: {}\n\n",
 354            self.label.read(cx).source(),
 355            self.status
 356        );
 357        for content in &self.content {
 358            markdown.push_str(content.to_markdown(cx).as_str());
 359            markdown.push_str("\n\n");
 360        }
 361        markdown
 362    }
 363
 364    async fn resolve_location(
 365        location: acp::ToolCallLocation,
 366        project: WeakEntity<Project>,
 367        cx: &mut AsyncApp,
 368    ) -> Option<ResolvedLocation> {
 369        let buffer = project
 370            .update(cx, |project, cx| {
 371                project
 372                    .project_path_for_absolute_path(&location.path, cx)
 373                    .map(|path| project.open_buffer(path, cx))
 374            })
 375            .ok()??;
 376        let buffer = buffer.await.log_err()?;
 377        let position = buffer
 378            .update(cx, |buffer, _| {
 379                let snapshot = buffer.snapshot();
 380                if let Some(row) = location.line {
 381                    let column = snapshot.indent_size_for_line(row).len;
 382                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 383                    snapshot.anchor_before(point)
 384                } else {
 385                    Anchor::min_for_buffer(snapshot.remote_id())
 386                }
 387            })
 388            .ok()?;
 389
 390        Some(ResolvedLocation { buffer, position })
 391    }
 392
 393    fn resolve_locations(
 394        &self,
 395        project: Entity<Project>,
 396        cx: &mut App,
 397    ) -> Task<Vec<Option<ResolvedLocation>>> {
 398        let locations = self.locations.clone();
 399        project.update(cx, |_, cx| {
 400            cx.spawn(async move |project, cx| {
 401                let mut new_locations = Vec::new();
 402                for location in locations {
 403                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 404                }
 405                new_locations
 406            })
 407        })
 408    }
 409}
 410
 411// Separate so we can hold a strong reference to the buffer
 412// for saving on the thread
 413#[derive(Clone, Debug, PartialEq, Eq)]
 414struct ResolvedLocation {
 415    buffer: Entity<Buffer>,
 416    position: Anchor,
 417}
 418
 419impl From<&ResolvedLocation> for AgentLocation {
 420    fn from(value: &ResolvedLocation) -> Self {
 421        Self {
 422            buffer: value.buffer.downgrade(),
 423            position: value.position,
 424        }
 425    }
 426}
 427
 428#[derive(Debug)]
 429pub enum ToolCallStatus {
 430    /// The tool call hasn't started running yet, but we start showing it to
 431    /// the user.
 432    Pending,
 433    /// The tool call is waiting for confirmation from the user.
 434    WaitingForConfirmation {
 435        options: Vec<acp::PermissionOption>,
 436        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 437    },
 438    /// The tool call is currently running.
 439    InProgress,
 440    /// The tool call completed successfully.
 441    Completed,
 442    /// The tool call failed.
 443    Failed,
 444    /// The user rejected the tool call.
 445    Rejected,
 446    /// The user canceled generation so the tool call was canceled.
 447    Canceled,
 448}
 449
 450impl From<acp::ToolCallStatus> for ToolCallStatus {
 451    fn from(status: acp::ToolCallStatus) -> Self {
 452        match status {
 453            acp::ToolCallStatus::Pending => Self::Pending,
 454            acp::ToolCallStatus::InProgress => Self::InProgress,
 455            acp::ToolCallStatus::Completed => Self::Completed,
 456            acp::ToolCallStatus::Failed => Self::Failed,
 457            _ => Self::Pending,
 458        }
 459    }
 460}
 461
 462impl Display for ToolCallStatus {
 463    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 464        write!(
 465            f,
 466            "{}",
 467            match self {
 468                ToolCallStatus::Pending => "Pending",
 469                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 470                ToolCallStatus::InProgress => "In Progress",
 471                ToolCallStatus::Completed => "Completed",
 472                ToolCallStatus::Failed => "Failed",
 473                ToolCallStatus::Rejected => "Rejected",
 474                ToolCallStatus::Canceled => "Canceled",
 475            }
 476        )
 477    }
 478}
 479
 480#[derive(Debug, PartialEq, Clone)]
 481pub enum ContentBlock {
 482    Empty,
 483    Markdown { markdown: Entity<Markdown> },
 484    ResourceLink { resource_link: acp::ResourceLink },
 485}
 486
 487impl ContentBlock {
 488    pub fn new(
 489        block: acp::ContentBlock,
 490        language_registry: &Arc<LanguageRegistry>,
 491        path_style: PathStyle,
 492        cx: &mut App,
 493    ) -> Self {
 494        let mut this = Self::Empty;
 495        this.append(block, language_registry, path_style, cx);
 496        this
 497    }
 498
 499    pub fn new_combined(
 500        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 501        language_registry: Arc<LanguageRegistry>,
 502        path_style: PathStyle,
 503        cx: &mut App,
 504    ) -> Self {
 505        let mut this = Self::Empty;
 506        for block in blocks {
 507            this.append(block, &language_registry, path_style, cx);
 508        }
 509        this
 510    }
 511
 512    pub fn append(
 513        &mut self,
 514        block: acp::ContentBlock,
 515        language_registry: &Arc<LanguageRegistry>,
 516        path_style: PathStyle,
 517        cx: &mut App,
 518    ) {
 519        if matches!(self, ContentBlock::Empty)
 520            && let acp::ContentBlock::ResourceLink(resource_link) = block
 521        {
 522            *self = ContentBlock::ResourceLink { resource_link };
 523            return;
 524        }
 525
 526        let new_content = self.block_string_contents(block, path_style);
 527
 528        match self {
 529            ContentBlock::Empty => {
 530                *self = Self::create_markdown_block(new_content, language_registry, cx);
 531            }
 532            ContentBlock::Markdown { markdown } => {
 533                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 534            }
 535            ContentBlock::ResourceLink { resource_link } => {
 536                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 537                let combined = format!("{}\n{}", existing_content, new_content);
 538
 539                *self = Self::create_markdown_block(combined, language_registry, cx);
 540            }
 541        }
 542    }
 543
 544    fn create_markdown_block(
 545        content: String,
 546        language_registry: &Arc<LanguageRegistry>,
 547        cx: &mut App,
 548    ) -> ContentBlock {
 549        ContentBlock::Markdown {
 550            markdown: cx
 551                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 552        }
 553    }
 554
 555    fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
 556        match block {
 557            acp::ContentBlock::Text(text_content) => text_content.text,
 558            acp::ContentBlock::ResourceLink(resource_link) => {
 559                Self::resource_link_md(&resource_link.uri, path_style)
 560            }
 561            acp::ContentBlock::Resource(acp::EmbeddedResource {
 562                resource:
 563                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 564                        uri,
 565                        ..
 566                    }),
 567                ..
 568            }) => Self::resource_link_md(&uri, path_style),
 569            acp::ContentBlock::Image(image) => Self::image_md(&image),
 570            _ => String::new(),
 571        }
 572    }
 573
 574    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 575        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 576            uri.as_link().to_string()
 577        } else {
 578            uri.to_string()
 579        }
 580    }
 581
 582    fn image_md(_image: &acp::ImageContent) -> String {
 583        "`Image`".into()
 584    }
 585
 586    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 587        match self {
 588            ContentBlock::Empty => "",
 589            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 590            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 591        }
 592    }
 593
 594    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 595        match self {
 596            ContentBlock::Empty => None,
 597            ContentBlock::Markdown { markdown } => Some(markdown),
 598            ContentBlock::ResourceLink { .. } => None,
 599        }
 600    }
 601
 602    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 603        match self {
 604            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 605            _ => None,
 606        }
 607    }
 608}
 609
 610#[derive(Debug)]
 611pub enum ToolCallContent {
 612    ContentBlock(ContentBlock),
 613    Diff(Entity<Diff>),
 614    Terminal(Entity<Terminal>),
 615}
 616
 617impl ToolCallContent {
 618    pub fn from_acp(
 619        content: acp::ToolCallContent,
 620        language_registry: Arc<LanguageRegistry>,
 621        path_style: PathStyle,
 622        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 623        cx: &mut App,
 624    ) -> Result<Option<Self>> {
 625        match content {
 626            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 627                Ok(Some(Self::ContentBlock(ContentBlock::new(
 628                    content,
 629                    &language_registry,
 630                    path_style,
 631                    cx,
 632                ))))
 633            }
 634            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 635                Diff::finalized(
 636                    diff.path.to_string_lossy().into_owned(),
 637                    diff.old_text,
 638                    diff.new_text,
 639                    language_registry,
 640                    cx,
 641                )
 642            })))),
 643            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 644                .get(&terminal_id)
 645                .cloned()
 646                .map(|terminal| Some(Self::Terminal(terminal)))
 647                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 648            _ => Ok(None),
 649        }
 650    }
 651
 652    pub fn update_from_acp(
 653        &mut self,
 654        new: acp::ToolCallContent,
 655        language_registry: Arc<LanguageRegistry>,
 656        path_style: PathStyle,
 657        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 658        cx: &mut App,
 659    ) -> Result<bool> {
 660        let needs_update = match (&self, &new) {
 661            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 662                old_diff.read(cx).needs_update(
 663                    new_diff.old_text.as_deref().unwrap_or(""),
 664                    &new_diff.new_text,
 665                    cx,
 666                )
 667            }
 668            _ => true,
 669        };
 670
 671        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 672            if needs_update {
 673                *self = update;
 674            }
 675            Ok(true)
 676        } else {
 677            Ok(false)
 678        }
 679    }
 680
 681    pub fn to_markdown(&self, cx: &App) -> String {
 682        match self {
 683            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 684            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 685            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 686        }
 687    }
 688}
 689
 690#[derive(Debug, PartialEq)]
 691pub enum ToolCallUpdate {
 692    UpdateFields(acp::ToolCallUpdate),
 693    UpdateDiff(ToolCallUpdateDiff),
 694    UpdateTerminal(ToolCallUpdateTerminal),
 695}
 696
 697impl ToolCallUpdate {
 698    fn id(&self) -> &acp::ToolCallId {
 699        match self {
 700            Self::UpdateFields(update) => &update.tool_call_id,
 701            Self::UpdateDiff(diff) => &diff.id,
 702            Self::UpdateTerminal(terminal) => &terminal.id,
 703        }
 704    }
 705}
 706
 707impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 708    fn from(update: acp::ToolCallUpdate) -> Self {
 709        Self::UpdateFields(update)
 710    }
 711}
 712
 713impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 714    fn from(diff: ToolCallUpdateDiff) -> Self {
 715        Self::UpdateDiff(diff)
 716    }
 717}
 718
 719#[derive(Debug, PartialEq)]
 720pub struct ToolCallUpdateDiff {
 721    pub id: acp::ToolCallId,
 722    pub diff: Entity<Diff>,
 723}
 724
 725impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 726    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 727        Self::UpdateTerminal(terminal)
 728    }
 729}
 730
 731#[derive(Debug, PartialEq)]
 732pub struct ToolCallUpdateTerminal {
 733    pub id: acp::ToolCallId,
 734    pub terminal: Entity<Terminal>,
 735}
 736
 737#[derive(Debug, Default)]
 738pub struct Plan {
 739    pub entries: Vec<PlanEntry>,
 740}
 741
 742#[derive(Debug)]
 743pub struct PlanStats<'a> {
 744    pub in_progress_entry: Option<&'a PlanEntry>,
 745    pub pending: u32,
 746    pub completed: u32,
 747}
 748
 749impl Plan {
 750    pub fn is_empty(&self) -> bool {
 751        self.entries.is_empty()
 752    }
 753
 754    pub fn stats(&self) -> PlanStats<'_> {
 755        let mut stats = PlanStats {
 756            in_progress_entry: None,
 757            pending: 0,
 758            completed: 0,
 759        };
 760
 761        for entry in &self.entries {
 762            match &entry.status {
 763                acp::PlanEntryStatus::Pending => {
 764                    stats.pending += 1;
 765                }
 766                acp::PlanEntryStatus::InProgress => {
 767                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 768                }
 769                acp::PlanEntryStatus::Completed => {
 770                    stats.completed += 1;
 771                }
 772                _ => {}
 773            }
 774        }
 775
 776        stats
 777    }
 778}
 779
 780#[derive(Debug)]
 781pub struct PlanEntry {
 782    pub content: Entity<Markdown>,
 783    pub priority: acp::PlanEntryPriority,
 784    pub status: acp::PlanEntryStatus,
 785}
 786
 787impl PlanEntry {
 788    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 789        Self {
 790            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 791            priority: entry.priority,
 792            status: entry.status,
 793        }
 794    }
 795}
 796
 797#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 798pub struct TokenUsage {
 799    pub max_tokens: u64,
 800    pub used_tokens: u64,
 801}
 802
 803impl TokenUsage {
 804    pub fn ratio(&self) -> TokenUsageRatio {
 805        #[cfg(debug_assertions)]
 806        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 807            .unwrap_or("0.8".to_string())
 808            .parse()
 809            .unwrap();
 810        #[cfg(not(debug_assertions))]
 811        let warning_threshold: f32 = 0.8;
 812
 813        // When the maximum is unknown because there is no selected model,
 814        // avoid showing the token limit warning.
 815        if self.max_tokens == 0 {
 816            TokenUsageRatio::Normal
 817        } else if self.used_tokens >= self.max_tokens {
 818            TokenUsageRatio::Exceeded
 819        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 820            TokenUsageRatio::Warning
 821        } else {
 822            TokenUsageRatio::Normal
 823        }
 824    }
 825}
 826
 827#[derive(Debug, Clone, PartialEq, Eq)]
 828pub enum TokenUsageRatio {
 829    Normal,
 830    Warning,
 831    Exceeded,
 832}
 833
 834#[derive(Debug, Clone)]
 835pub struct RetryStatus {
 836    pub last_error: SharedString,
 837    pub attempt: usize,
 838    pub max_attempts: usize,
 839    pub started_at: Instant,
 840    pub duration: Duration,
 841}
 842
 843pub struct AcpThread {
 844    title: SharedString,
 845    entries: Vec<AgentThreadEntry>,
 846    plan: Plan,
 847    project: Entity<Project>,
 848    action_log: Entity<ActionLog>,
 849    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 850    send_task: Option<Task<()>>,
 851    connection: Rc<dyn AgentConnection>,
 852    session_id: acp::SessionId,
 853    token_usage: Option<TokenUsage>,
 854    prompt_capabilities: acp::PromptCapabilities,
 855    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 856    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 857    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 858    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 859}
 860
 861impl From<&AcpThread> for ActionLogTelemetry {
 862    fn from(value: &AcpThread) -> Self {
 863        Self {
 864            agent_telemetry_id: value.connection().telemetry_id(),
 865            session_id: value.session_id.0.clone(),
 866        }
 867    }
 868}
 869
 870#[derive(Debug)]
 871pub enum AcpThreadEvent {
 872    NewEntry,
 873    TitleUpdated,
 874    TokenUsageUpdated,
 875    EntryUpdated(usize),
 876    EntriesRemoved(Range<usize>),
 877    ToolAuthorizationRequired,
 878    Retry(RetryStatus),
 879    Stopped,
 880    Error,
 881    LoadError(LoadError),
 882    PromptCapabilitiesUpdated,
 883    Refusal,
 884    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 885    ModeUpdated(acp::SessionModeId),
 886}
 887
 888impl EventEmitter<AcpThreadEvent> for AcpThread {}
 889
 890#[derive(Debug, Clone)]
 891pub enum TerminalProviderEvent {
 892    Created {
 893        terminal_id: acp::TerminalId,
 894        label: String,
 895        cwd: Option<PathBuf>,
 896        output_byte_limit: Option<u64>,
 897        terminal: Entity<::terminal::Terminal>,
 898    },
 899    Output {
 900        terminal_id: acp::TerminalId,
 901        data: Vec<u8>,
 902    },
 903    TitleChanged {
 904        terminal_id: acp::TerminalId,
 905        title: String,
 906    },
 907    Exit {
 908        terminal_id: acp::TerminalId,
 909        status: acp::TerminalExitStatus,
 910    },
 911}
 912
 913#[derive(Debug, Clone)]
 914pub enum TerminalProviderCommand {
 915    WriteInput {
 916        terminal_id: acp::TerminalId,
 917        bytes: Vec<u8>,
 918    },
 919    Resize {
 920        terminal_id: acp::TerminalId,
 921        cols: u16,
 922        rows: u16,
 923    },
 924    Close {
 925        terminal_id: acp::TerminalId,
 926    },
 927}
 928
 929impl AcpThread {
 930    pub fn on_terminal_provider_event(
 931        &mut self,
 932        event: TerminalProviderEvent,
 933        cx: &mut Context<Self>,
 934    ) {
 935        match event {
 936            TerminalProviderEvent::Created {
 937                terminal_id,
 938                label,
 939                cwd,
 940                output_byte_limit,
 941                terminal,
 942            } => {
 943                let entity = self.register_terminal_created(
 944                    terminal_id.clone(),
 945                    label,
 946                    cwd,
 947                    output_byte_limit,
 948                    terminal,
 949                    cx,
 950                );
 951
 952                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
 953                    for data in chunks.drain(..) {
 954                        entity.update(cx, |term, cx| {
 955                            term.inner().update(cx, |inner, cx| {
 956                                inner.write_output(&data, cx);
 957                            })
 958                        });
 959                    }
 960                }
 961
 962                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
 963                    entity.update(cx, |_term, cx| {
 964                        cx.notify();
 965                    });
 966                }
 967
 968                cx.notify();
 969            }
 970            TerminalProviderEvent::Output { terminal_id, data } => {
 971                if let Some(entity) = self.terminals.get(&terminal_id) {
 972                    entity.update(cx, |term, cx| {
 973                        term.inner().update(cx, |inner, cx| {
 974                            inner.write_output(&data, cx);
 975                        })
 976                    });
 977                } else {
 978                    self.pending_terminal_output
 979                        .entry(terminal_id)
 980                        .or_default()
 981                        .push(data);
 982                }
 983            }
 984            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
 985                if let Some(entity) = self.terminals.get(&terminal_id) {
 986                    entity.update(cx, |term, cx| {
 987                        term.inner().update(cx, |inner, cx| {
 988                            inner.breadcrumb_text = title;
 989                            cx.emit(::terminal::Event::BreadcrumbsChanged);
 990                        })
 991                    });
 992                }
 993            }
 994            TerminalProviderEvent::Exit {
 995                terminal_id,
 996                status,
 997            } => {
 998                if let Some(entity) = self.terminals.get(&terminal_id) {
 999                    entity.update(cx, |_term, cx| {
1000                        cx.notify();
1001                    });
1002                } else {
1003                    self.pending_terminal_exit.insert(terminal_id, status);
1004                }
1005            }
1006        }
1007    }
1008}
1009
1010#[derive(PartialEq, Eq, Debug)]
1011pub enum ThreadStatus {
1012    Idle,
1013    Generating,
1014}
1015
1016#[derive(Debug, Clone)]
1017pub enum LoadError {
1018    Unsupported {
1019        command: SharedString,
1020        current_version: SharedString,
1021        minimum_version: SharedString,
1022    },
1023    FailedToInstall(SharedString),
1024    Exited {
1025        status: ExitStatus,
1026    },
1027    Other(SharedString),
1028}
1029
1030impl Display for LoadError {
1031    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1032        match self {
1033            LoadError::Unsupported {
1034                command: path,
1035                current_version,
1036                minimum_version,
1037            } => {
1038                write!(
1039                    f,
1040                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1041                )
1042            }
1043            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1044            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1045            LoadError::Other(msg) => write!(f, "{msg}"),
1046        }
1047    }
1048}
1049
1050impl Error for LoadError {}
1051
1052impl AcpThread {
1053    pub fn new(
1054        title: impl Into<SharedString>,
1055        connection: Rc<dyn AgentConnection>,
1056        project: Entity<Project>,
1057        action_log: Entity<ActionLog>,
1058        session_id: acp::SessionId,
1059        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1060        cx: &mut Context<Self>,
1061    ) -> Self {
1062        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1063        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1064            loop {
1065                let caps = prompt_capabilities_rx.recv().await?;
1066                this.update(cx, |this, cx| {
1067                    this.prompt_capabilities = caps;
1068                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1069                })?;
1070            }
1071        });
1072
1073        Self {
1074            action_log,
1075            shared_buffers: Default::default(),
1076            entries: Default::default(),
1077            plan: Default::default(),
1078            title: title.into(),
1079            project,
1080            send_task: None,
1081            connection,
1082            session_id,
1083            token_usage: None,
1084            prompt_capabilities,
1085            _observe_prompt_capabilities: task,
1086            terminals: HashMap::default(),
1087            pending_terminal_output: HashMap::default(),
1088            pending_terminal_exit: HashMap::default(),
1089        }
1090    }
1091
1092    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1093        self.prompt_capabilities.clone()
1094    }
1095
1096    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1097        &self.connection
1098    }
1099
1100    pub fn action_log(&self) -> &Entity<ActionLog> {
1101        &self.action_log
1102    }
1103
1104    pub fn project(&self) -> &Entity<Project> {
1105        &self.project
1106    }
1107
1108    pub fn title(&self) -> SharedString {
1109        self.title.clone()
1110    }
1111
1112    pub fn entries(&self) -> &[AgentThreadEntry] {
1113        &self.entries
1114    }
1115
1116    pub fn session_id(&self) -> &acp::SessionId {
1117        &self.session_id
1118    }
1119
1120    pub fn status(&self) -> ThreadStatus {
1121        if self.send_task.is_some() {
1122            ThreadStatus::Generating
1123        } else {
1124            ThreadStatus::Idle
1125        }
1126    }
1127
1128    pub fn token_usage(&self) -> Option<&TokenUsage> {
1129        self.token_usage.as_ref()
1130    }
1131
1132    pub fn has_pending_edit_tool_calls(&self) -> bool {
1133        for entry in self.entries.iter().rev() {
1134            match entry {
1135                AgentThreadEntry::UserMessage(_) => return false,
1136                AgentThreadEntry::ToolCall(
1137                    call @ ToolCall {
1138                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1139                        ..
1140                    },
1141                ) if call.diffs().next().is_some() => {
1142                    return true;
1143                }
1144                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1145            }
1146        }
1147
1148        false
1149    }
1150
1151    pub fn used_tools_since_last_user_message(&self) -> bool {
1152        for entry in self.entries.iter().rev() {
1153            match entry {
1154                AgentThreadEntry::UserMessage(..) => return false,
1155                AgentThreadEntry::AssistantMessage(..) => continue,
1156                AgentThreadEntry::ToolCall(..) => return true,
1157            }
1158        }
1159
1160        false
1161    }
1162
1163    pub fn handle_session_update(
1164        &mut self,
1165        update: acp::SessionUpdate,
1166        cx: &mut Context<Self>,
1167    ) -> Result<(), acp::Error> {
1168        match update {
1169            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1170                self.push_user_content_block(None, content, cx);
1171            }
1172            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1173                self.push_assistant_content_block(content, false, cx);
1174            }
1175            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1176                self.push_assistant_content_block(content, true, cx);
1177            }
1178            acp::SessionUpdate::ToolCall(tool_call) => {
1179                self.upsert_tool_call(tool_call, cx)?;
1180            }
1181            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1182                self.update_tool_call(tool_call_update, cx)?;
1183            }
1184            acp::SessionUpdate::Plan(plan) => {
1185                self.update_plan(plan, cx);
1186            }
1187            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1188                available_commands,
1189                ..
1190            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1191            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1192                current_mode_id,
1193                ..
1194            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1195            _ => {}
1196        }
1197        Ok(())
1198    }
1199
1200    pub fn push_user_content_block(
1201        &mut self,
1202        message_id: Option<UserMessageId>,
1203        chunk: acp::ContentBlock,
1204        cx: &mut Context<Self>,
1205    ) {
1206        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1207    }
1208
1209    pub fn push_user_content_block_with_indent(
1210        &mut self,
1211        message_id: Option<UserMessageId>,
1212        chunk: acp::ContentBlock,
1213        indented: bool,
1214        cx: &mut Context<Self>,
1215    ) {
1216        let language_registry = self.project.read(cx).languages().clone();
1217        let path_style = self.project.read(cx).path_style(cx);
1218        let entries_len = self.entries.len();
1219
1220        if let Some(last_entry) = self.entries.last_mut()
1221            && let AgentThreadEntry::UserMessage(UserMessage {
1222                id,
1223                content,
1224                chunks,
1225                indented: existing_indented,
1226                ..
1227            }) = last_entry
1228            && *existing_indented == indented
1229        {
1230            *id = message_id.or(id.take());
1231            content.append(chunk.clone(), &language_registry, path_style, cx);
1232            chunks.push(chunk);
1233            let idx = entries_len - 1;
1234            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1235        } else {
1236            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1237            self.push_entry(
1238                AgentThreadEntry::UserMessage(UserMessage {
1239                    id: message_id,
1240                    content,
1241                    chunks: vec![chunk],
1242                    checkpoint: None,
1243                    indented,
1244                }),
1245                cx,
1246            );
1247        }
1248    }
1249
1250    pub fn push_assistant_content_block(
1251        &mut self,
1252        chunk: acp::ContentBlock,
1253        is_thought: bool,
1254        cx: &mut Context<Self>,
1255    ) {
1256        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1257    }
1258
1259    pub fn push_assistant_content_block_with_indent(
1260        &mut self,
1261        chunk: acp::ContentBlock,
1262        is_thought: bool,
1263        indented: bool,
1264        cx: &mut Context<Self>,
1265    ) {
1266        let language_registry = self.project.read(cx).languages().clone();
1267        let path_style = self.project.read(cx).path_style(cx);
1268        let entries_len = self.entries.len();
1269        if let Some(last_entry) = self.entries.last_mut()
1270            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1271                chunks,
1272                indented: existing_indented,
1273            }) = last_entry
1274            && *existing_indented == indented
1275        {
1276            let idx = entries_len - 1;
1277            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1278            match (chunks.last_mut(), is_thought) {
1279                (Some(AssistantMessageChunk::Message { block }), false)
1280                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1281                    block.append(chunk, &language_registry, path_style, cx)
1282                }
1283                _ => {
1284                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1285                    if is_thought {
1286                        chunks.push(AssistantMessageChunk::Thought { block })
1287                    } else {
1288                        chunks.push(AssistantMessageChunk::Message { block })
1289                    }
1290                }
1291            }
1292        } else {
1293            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1294            let chunk = if is_thought {
1295                AssistantMessageChunk::Thought { block }
1296            } else {
1297                AssistantMessageChunk::Message { block }
1298            };
1299
1300            self.push_entry(
1301                AgentThreadEntry::AssistantMessage(AssistantMessage {
1302                    chunks: vec![chunk],
1303                    indented,
1304                }),
1305                cx,
1306            );
1307        }
1308    }
1309
1310    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1311        self.entries.push(entry);
1312        cx.emit(AcpThreadEvent::NewEntry);
1313    }
1314
1315    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1316        self.connection.set_title(&self.session_id, cx).is_some()
1317    }
1318
1319    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1320        if title != self.title {
1321            self.title = title.clone();
1322            cx.emit(AcpThreadEvent::TitleUpdated);
1323            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1324                return set_title.run(title, cx);
1325            }
1326        }
1327        Task::ready(Ok(()))
1328    }
1329
1330    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1331        self.token_usage = usage;
1332        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1333    }
1334
1335    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1336        cx.emit(AcpThreadEvent::Retry(status));
1337    }
1338
1339    pub fn update_tool_call(
1340        &mut self,
1341        update: impl Into<ToolCallUpdate>,
1342        cx: &mut Context<Self>,
1343    ) -> Result<()> {
1344        let update = update.into();
1345        let languages = self.project.read(cx).languages().clone();
1346        let path_style = self.project.read(cx).path_style(cx);
1347
1348        let ix = match self.index_for_tool_call(update.id()) {
1349            Some(ix) => ix,
1350            None => {
1351                // Tool call not found - create a failed tool call entry
1352                let failed_tool_call = ToolCall {
1353                    id: update.id().clone(),
1354                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1355                    kind: acp::ToolKind::Fetch,
1356                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1357                        "Tool call not found".into(),
1358                        &languages,
1359                        path_style,
1360                        cx,
1361                    ))],
1362                    status: ToolCallStatus::Failed,
1363                    locations: Vec::new(),
1364                    resolved_locations: Vec::new(),
1365                    raw_input: None,
1366                    raw_input_markdown: None,
1367                    raw_output: None,
1368                };
1369                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1370                return Ok(());
1371            }
1372        };
1373        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1374            unreachable!()
1375        };
1376
1377        match update {
1378            ToolCallUpdate::UpdateFields(update) => {
1379                let location_updated = update.fields.locations.is_some();
1380                call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1381                if location_updated {
1382                    self.resolve_locations(update.tool_call_id, cx);
1383                }
1384            }
1385            ToolCallUpdate::UpdateDiff(update) => {
1386                call.content.clear();
1387                call.content.push(ToolCallContent::Diff(update.diff));
1388            }
1389            ToolCallUpdate::UpdateTerminal(update) => {
1390                call.content.clear();
1391                call.content
1392                    .push(ToolCallContent::Terminal(update.terminal));
1393            }
1394        }
1395
1396        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1397
1398        Ok(())
1399    }
1400
1401    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1402    pub fn upsert_tool_call(
1403        &mut self,
1404        tool_call: acp::ToolCall,
1405        cx: &mut Context<Self>,
1406    ) -> Result<(), acp::Error> {
1407        let status = tool_call.status.into();
1408        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1409    }
1410
1411    /// Fails if id does not match an existing entry.
1412    pub fn upsert_tool_call_inner(
1413        &mut self,
1414        update: acp::ToolCallUpdate,
1415        status: ToolCallStatus,
1416        cx: &mut Context<Self>,
1417    ) -> Result<(), acp::Error> {
1418        let language_registry = self.project.read(cx).languages().clone();
1419        let path_style = self.project.read(cx).path_style(cx);
1420        let id = update.tool_call_id.clone();
1421
1422        let agent_telemetry_id = self.connection().telemetry_id();
1423        let session = self.session_id();
1424        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1425            let status = if matches!(status, ToolCallStatus::Completed) {
1426                "completed"
1427            } else {
1428                "failed"
1429            };
1430            telemetry::event!(
1431                "Agent Tool Call Completed",
1432                agent_telemetry_id,
1433                session,
1434                status
1435            );
1436        }
1437
1438        if let Some(ix) = self.index_for_tool_call(&id) {
1439            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1440                unreachable!()
1441            };
1442
1443            call.update_fields(
1444                update.fields,
1445                language_registry,
1446                path_style,
1447                &self.terminals,
1448                cx,
1449            )?;
1450            call.status = status;
1451
1452            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1453        } else {
1454            let call = ToolCall::from_acp(
1455                update.try_into()?,
1456                status,
1457                language_registry,
1458                self.project.read(cx).path_style(cx),
1459                &self.terminals,
1460                cx,
1461            )?;
1462            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1463        };
1464
1465        self.resolve_locations(id, cx);
1466        Ok(())
1467    }
1468
1469    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1470        self.entries
1471            .iter()
1472            .enumerate()
1473            .rev()
1474            .find_map(|(index, entry)| {
1475                if let AgentThreadEntry::ToolCall(tool_call) = entry
1476                    && &tool_call.id == id
1477                {
1478                    Some(index)
1479                } else {
1480                    None
1481                }
1482            })
1483    }
1484
1485    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1486        // The tool call we are looking for is typically the last one, or very close to the end.
1487        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1488        self.entries
1489            .iter_mut()
1490            .enumerate()
1491            .rev()
1492            .find_map(|(index, tool_call)| {
1493                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1494                    && &tool_call.id == id
1495                {
1496                    Some((index, tool_call))
1497                } else {
1498                    None
1499                }
1500            })
1501    }
1502
1503    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1504        self.entries
1505            .iter()
1506            .enumerate()
1507            .rev()
1508            .find_map(|(index, tool_call)| {
1509                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1510                    && &tool_call.id == id
1511                {
1512                    Some((index, tool_call))
1513                } else {
1514                    None
1515                }
1516            })
1517    }
1518
1519    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1520        let project = self.project.clone();
1521        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1522            return;
1523        };
1524        let task = tool_call.resolve_locations(project, cx);
1525        cx.spawn(async move |this, cx| {
1526            let resolved_locations = task.await;
1527
1528            this.update(cx, |this, cx| {
1529                let project = this.project.clone();
1530
1531                for location in resolved_locations.iter().flatten() {
1532                    this.shared_buffers
1533                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1534                }
1535                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1536                    return;
1537                };
1538
1539                if let Some(Some(location)) = resolved_locations.last() {
1540                    project.update(cx, |project, cx| {
1541                        let should_ignore = if let Some(agent_location) = project
1542                            .agent_location()
1543                            .filter(|agent_location| agent_location.buffer == location.buffer)
1544                        {
1545                            let snapshot = location.buffer.read(cx).snapshot();
1546                            let old_position = agent_location.position.to_point(&snapshot);
1547                            let new_position = location.position.to_point(&snapshot);
1548
1549                            // ignore this so that when we get updates from the edit tool
1550                            // the position doesn't reset to the startof line
1551                            old_position.row == new_position.row
1552                                && old_position.column > new_position.column
1553                        } else {
1554                            false
1555                        };
1556                        if !should_ignore {
1557                            project.set_agent_location(Some(location.into()), cx);
1558                        }
1559                    });
1560                }
1561
1562                let resolved_locations = resolved_locations
1563                    .iter()
1564                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1565                    .collect::<Vec<_>>();
1566
1567                if tool_call.resolved_locations != resolved_locations {
1568                    tool_call.resolved_locations = resolved_locations;
1569                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1570                }
1571            })
1572        })
1573        .detach();
1574    }
1575
1576    pub fn request_tool_call_authorization(
1577        &mut self,
1578        tool_call: acp::ToolCallUpdate,
1579        options: Vec<acp::PermissionOption>,
1580        respect_always_allow_setting: bool,
1581        cx: &mut Context<Self>,
1582    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1583        let (tx, rx) = oneshot::channel();
1584
1585        if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1586            // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1587            // some tools would (incorrectly) continue to auto-accept.
1588            if let Some(allow_once_option) = options.iter().find_map(|option| {
1589                if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1590                    Some(option.option_id.clone())
1591                } else {
1592                    None
1593                }
1594            }) {
1595                self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1596                return Ok(async {
1597                    acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
1598                        allow_once_option,
1599                    ))
1600                }
1601                .boxed());
1602            }
1603        }
1604
1605        let status = ToolCallStatus::WaitingForConfirmation {
1606            options,
1607            respond_tx: tx,
1608        };
1609
1610        self.upsert_tool_call_inner(tool_call, status, cx)?;
1611        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1612
1613        let fut = async {
1614            match rx.await {
1615                Ok(option) => acp::RequestPermissionOutcome::Selected(
1616                    acp::SelectedPermissionOutcome::new(option),
1617                ),
1618                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1619            }
1620        }
1621        .boxed();
1622
1623        Ok(fut)
1624    }
1625
1626    pub fn authorize_tool_call(
1627        &mut self,
1628        id: acp::ToolCallId,
1629        option_id: acp::PermissionOptionId,
1630        option_kind: acp::PermissionOptionKind,
1631        cx: &mut Context<Self>,
1632    ) {
1633        let Some((ix, call)) = self.tool_call_mut(&id) else {
1634            return;
1635        };
1636
1637        let new_status = match option_kind {
1638            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1639                ToolCallStatus::Rejected
1640            }
1641            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1642                ToolCallStatus::InProgress
1643            }
1644            _ => ToolCallStatus::InProgress,
1645        };
1646
1647        let curr_status = mem::replace(&mut call.status, new_status);
1648
1649        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1650            respond_tx.send(option_id).log_err();
1651        } else if cfg!(debug_assertions) {
1652            panic!("tried to authorize an already authorized tool call");
1653        }
1654
1655        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1656    }
1657
1658    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1659        let mut first_tool_call = None;
1660
1661        for entry in self.entries.iter().rev() {
1662            match &entry {
1663                AgentThreadEntry::ToolCall(call) => {
1664                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1665                        first_tool_call = Some(call);
1666                    } else {
1667                        continue;
1668                    }
1669                }
1670                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1671                    // Reached the beginning of the turn.
1672                    // If we had pending permission requests in the previous turn, they have been cancelled.
1673                    break;
1674                }
1675            }
1676        }
1677
1678        first_tool_call
1679    }
1680
1681    pub fn plan(&self) -> &Plan {
1682        &self.plan
1683    }
1684
1685    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1686        let new_entries_len = request.entries.len();
1687        let mut new_entries = request.entries.into_iter();
1688
1689        // Reuse existing markdown to prevent flickering
1690        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1691            let PlanEntry {
1692                content,
1693                priority,
1694                status,
1695            } = old;
1696            content.update(cx, |old, cx| {
1697                old.replace(new.content, cx);
1698            });
1699            *priority = new.priority;
1700            *status = new.status;
1701        }
1702        for new in new_entries {
1703            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1704        }
1705        self.plan.entries.truncate(new_entries_len);
1706
1707        cx.notify();
1708    }
1709
1710    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1711        self.plan
1712            .entries
1713            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1714        cx.notify();
1715    }
1716
1717    #[cfg(any(test, feature = "test-support"))]
1718    pub fn send_raw(
1719        &mut self,
1720        message: &str,
1721        cx: &mut Context<Self>,
1722    ) -> BoxFuture<'static, Result<()>> {
1723        self.send(vec![message.into()], cx)
1724    }
1725
1726    pub fn send(
1727        &mut self,
1728        message: Vec<acp::ContentBlock>,
1729        cx: &mut Context<Self>,
1730    ) -> BoxFuture<'static, Result<()>> {
1731        let block = ContentBlock::new_combined(
1732            message.clone(),
1733            self.project.read(cx).languages().clone(),
1734            self.project.read(cx).path_style(cx),
1735            cx,
1736        );
1737        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1738        let git_store = self.project.read(cx).git_store().clone();
1739
1740        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1741            Some(UserMessageId::new())
1742        } else {
1743            None
1744        };
1745
1746        self.run_turn(cx, async move |this, cx| {
1747            this.update(cx, |this, cx| {
1748                this.push_entry(
1749                    AgentThreadEntry::UserMessage(UserMessage {
1750                        id: message_id.clone(),
1751                        content: block,
1752                        chunks: message,
1753                        checkpoint: None,
1754                        indented: false,
1755                    }),
1756                    cx,
1757                );
1758            })
1759            .ok();
1760
1761            let old_checkpoint = git_store
1762                .update(cx, |git, cx| git.checkpoint(cx))?
1763                .await
1764                .context("failed to get old checkpoint")
1765                .log_err();
1766            this.update(cx, |this, cx| {
1767                if let Some((_ix, message)) = this.last_user_message() {
1768                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1769                        git_checkpoint,
1770                        show: false,
1771                    });
1772                }
1773                this.connection.prompt(message_id, request, cx)
1774            })?
1775            .await
1776        })
1777    }
1778
1779    pub fn can_resume(&self, cx: &App) -> bool {
1780        self.connection.resume(&self.session_id, cx).is_some()
1781    }
1782
1783    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1784        self.run_turn(cx, async move |this, cx| {
1785            this.update(cx, |this, cx| {
1786                this.connection
1787                    .resume(&this.session_id, cx)
1788                    .map(|resume| resume.run(cx))
1789            })?
1790            .context("resuming a session is not supported")?
1791            .await
1792        })
1793    }
1794
1795    fn run_turn(
1796        &mut self,
1797        cx: &mut Context<Self>,
1798        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1799    ) -> BoxFuture<'static, Result<()>> {
1800        self.clear_completed_plan_entries(cx);
1801
1802        let (tx, rx) = oneshot::channel();
1803        let cancel_task = self.cancel(cx);
1804
1805        self.send_task = Some(cx.spawn(async move |this, cx| {
1806            cancel_task.await;
1807            tx.send(f(this, cx).await).ok();
1808        }));
1809
1810        cx.spawn(async move |this, cx| {
1811            let response = rx.await;
1812
1813            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1814                .await?;
1815
1816            this.update(cx, |this, cx| {
1817                this.project
1818                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1819                match response {
1820                    Ok(Err(e)) => {
1821                        this.send_task.take();
1822                        cx.emit(AcpThreadEvent::Error);
1823                        Err(e)
1824                    }
1825                    result => {
1826                        let canceled = matches!(
1827                            result,
1828                            Ok(Ok(acp::PromptResponse {
1829                                stop_reason: acp::StopReason::Cancelled,
1830                                ..
1831                            }))
1832                        );
1833
1834                        // We only take the task if the current prompt wasn't canceled.
1835                        //
1836                        // This prompt may have been canceled because another one was sent
1837                        // while it was still generating. In these cases, dropping `send_task`
1838                        // would cause the next generation to be canceled.
1839                        if !canceled {
1840                            this.send_task.take();
1841                        }
1842
1843                        // Handle refusal - distinguish between user prompt and tool call refusals
1844                        if let Ok(Ok(acp::PromptResponse {
1845                            stop_reason: acp::StopReason::Refusal,
1846                            ..
1847                        })) = result
1848                        {
1849                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1850                                // Check if there's a completed tool call with results after the last user message
1851                                // This indicates the refusal is in response to tool output, not the user's prompt
1852                                let has_completed_tool_call_after_user_msg =
1853                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1854                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1855                                            // Check if the tool call has completed and has output
1856                                            matches!(tool_call.status, ToolCallStatus::Completed)
1857                                                && tool_call.raw_output.is_some()
1858                                        } else {
1859                                            false
1860                                        }
1861                                    });
1862
1863                                if has_completed_tool_call_after_user_msg {
1864                                    // Refusal is due to tool output - don't truncate, just notify
1865                                    // The model refused based on what the tool returned
1866                                    cx.emit(AcpThreadEvent::Refusal);
1867                                } else {
1868                                    // User prompt was refused - truncate back to before the user message
1869                                    let range = user_msg_ix..this.entries.len();
1870                                    if range.start < range.end {
1871                                        this.entries.truncate(user_msg_ix);
1872                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
1873                                    }
1874                                    cx.emit(AcpThreadEvent::Refusal);
1875                                }
1876                            } else {
1877                                // No user message found, treat as general refusal
1878                                cx.emit(AcpThreadEvent::Refusal);
1879                            }
1880                        }
1881
1882                        cx.emit(AcpThreadEvent::Stopped);
1883                        Ok(())
1884                    }
1885                }
1886            })?
1887        })
1888        .boxed()
1889    }
1890
1891    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1892        let Some(send_task) = self.send_task.take() else {
1893            return Task::ready(());
1894        };
1895
1896        for entry in self.entries.iter_mut() {
1897            if let AgentThreadEntry::ToolCall(call) = entry {
1898                let cancel = matches!(
1899                    call.status,
1900                    ToolCallStatus::Pending
1901                        | ToolCallStatus::WaitingForConfirmation { .. }
1902                        | ToolCallStatus::InProgress
1903                );
1904
1905                if cancel {
1906                    call.status = ToolCallStatus::Canceled;
1907                }
1908            }
1909        }
1910
1911        self.connection.cancel(&self.session_id, cx);
1912
1913        // Wait for the send task to complete
1914        cx.foreground_executor().spawn(send_task)
1915    }
1916
1917    /// Restores the git working tree to the state at the given checkpoint (if one exists)
1918    pub fn restore_checkpoint(
1919        &mut self,
1920        id: UserMessageId,
1921        cx: &mut Context<Self>,
1922    ) -> Task<Result<()>> {
1923        let Some((_, message)) = self.user_message_mut(&id) else {
1924            return Task::ready(Err(anyhow!("message not found")));
1925        };
1926
1927        let checkpoint = message
1928            .checkpoint
1929            .as_ref()
1930            .map(|c| c.git_checkpoint.clone());
1931
1932        // Cancel any in-progress generation before restoring
1933        let cancel_task = self.cancel(cx);
1934        let rewind = self.rewind(id.clone(), cx);
1935        let git_store = self.project.read(cx).git_store().clone();
1936
1937        cx.spawn(async move |_, cx| {
1938            cancel_task.await;
1939            rewind.await?;
1940            if let Some(checkpoint) = checkpoint {
1941                git_store
1942                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1943                    .await?;
1944            }
1945
1946            Ok(())
1947        })
1948    }
1949
1950    /// Rewinds this thread to before the entry at `index`, removing it and all
1951    /// subsequent entries while rejecting any action_log changes made from that point.
1952    /// Unlike `restore_checkpoint`, this method does not restore from git.
1953    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1954        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1955            return Task::ready(Err(anyhow!("not supported")));
1956        };
1957
1958        let telemetry = ActionLogTelemetry::from(&*self);
1959        cx.spawn(async move |this, cx| {
1960            cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1961            this.update(cx, |this, cx| {
1962                if let Some((ix, _)) = this.user_message_mut(&id) {
1963                    // Collect all terminals from entries that will be removed
1964                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
1965                        .iter()
1966                        .flat_map(|entry| entry.terminals())
1967                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
1968                        .collect();
1969
1970                    let range = ix..this.entries.len();
1971                    this.entries.truncate(ix);
1972                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1973
1974                    // Kill and remove the terminals
1975                    for terminal_id in terminals_to_remove {
1976                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
1977                            terminal.update(cx, |terminal, cx| {
1978                                terminal.kill(cx);
1979                            });
1980                        }
1981                    }
1982                }
1983                this.action_log().update(cx, |action_log, cx| {
1984                    action_log.reject_all_edits(Some(telemetry), cx)
1985                })
1986            })?
1987            .await;
1988            Ok(())
1989        })
1990    }
1991
1992    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1993        let git_store = self.project.read(cx).git_store().clone();
1994
1995        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1996            if let Some(checkpoint) = message.checkpoint.as_ref() {
1997                checkpoint.git_checkpoint.clone()
1998            } else {
1999                return Task::ready(Ok(()));
2000            }
2001        } else {
2002            return Task::ready(Ok(()));
2003        };
2004
2005        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2006        cx.spawn(async move |this, cx| {
2007            let new_checkpoint = new_checkpoint
2008                .await
2009                .context("failed to get new checkpoint")
2010                .log_err();
2011            if let Some(new_checkpoint) = new_checkpoint {
2012                let equal = git_store
2013                    .update(cx, |git, cx| {
2014                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2015                    })?
2016                    .await
2017                    .unwrap_or(true);
2018                this.update(cx, |this, cx| {
2019                    let (ix, message) = this.last_user_message().context("no user message")?;
2020                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
2021                    checkpoint.show = !equal;
2022                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
2023                    anyhow::Ok(())
2024                })??;
2025            }
2026
2027            Ok(())
2028        })
2029    }
2030
2031    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2032        self.entries
2033            .iter_mut()
2034            .enumerate()
2035            .rev()
2036            .find_map(|(ix, entry)| {
2037                if let AgentThreadEntry::UserMessage(message) = entry {
2038                    Some((ix, message))
2039                } else {
2040                    None
2041                }
2042            })
2043    }
2044
2045    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2046        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2047            if let AgentThreadEntry::UserMessage(message) = entry {
2048                if message.id.as_ref() == Some(id) {
2049                    Some((ix, message))
2050                } else {
2051                    None
2052                }
2053            } else {
2054                None
2055            }
2056        })
2057    }
2058
2059    pub fn read_text_file(
2060        &self,
2061        path: PathBuf,
2062        line: Option<u32>,
2063        limit: Option<u32>,
2064        reuse_shared_snapshot: bool,
2065        cx: &mut Context<Self>,
2066    ) -> Task<Result<String, acp::Error>> {
2067        // Args are 1-based, move to 0-based
2068        let line = line.unwrap_or_default().saturating_sub(1);
2069        let limit = limit.unwrap_or(u32::MAX);
2070        let project = self.project.clone();
2071        let action_log = self.action_log.clone();
2072        cx.spawn(async move |this, cx| {
2073            let load = project
2074                .update(cx, |project, cx| {
2075                    let path = project
2076                        .project_path_for_absolute_path(&path, cx)
2077                        .ok_or_else(|| {
2078                            acp::Error::resource_not_found(Some(path.display().to_string()))
2079                        })?;
2080                    Ok(project.open_buffer(path, cx))
2081                })
2082                .map_err(|e| acp::Error::internal_error().data(e.to_string()))
2083                .flatten()?;
2084
2085            let buffer = load.await?;
2086
2087            let snapshot = if reuse_shared_snapshot {
2088                this.read_with(cx, |this, _| {
2089                    this.shared_buffers.get(&buffer.clone()).cloned()
2090                })
2091                .log_err()
2092                .flatten()
2093            } else {
2094                None
2095            };
2096
2097            let snapshot = if let Some(snapshot) = snapshot {
2098                snapshot
2099            } else {
2100                action_log.update(cx, |action_log, cx| {
2101                    action_log.buffer_read(buffer.clone(), cx);
2102                })?;
2103
2104                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2105                this.update(cx, |this, _| {
2106                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2107                })?;
2108                snapshot
2109            };
2110
2111            let max_point = snapshot.max_point();
2112            let start_position = Point::new(line, 0);
2113
2114            if start_position > max_point {
2115                return Err(acp::Error::invalid_params().data(format!(
2116                    "Attempting to read beyond the end of the file, line {}:{}",
2117                    max_point.row + 1,
2118                    max_point.column
2119                )));
2120            }
2121
2122            let start = snapshot.anchor_before(start_position);
2123            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2124
2125            project.update(cx, |project, cx| {
2126                project.set_agent_location(
2127                    Some(AgentLocation {
2128                        buffer: buffer.downgrade(),
2129                        position: start,
2130                    }),
2131                    cx,
2132                );
2133            })?;
2134
2135            Ok(snapshot.text_for_range(start..end).collect::<String>())
2136        })
2137    }
2138
2139    pub fn write_text_file(
2140        &self,
2141        path: PathBuf,
2142        content: String,
2143        cx: &mut Context<Self>,
2144    ) -> Task<Result<()>> {
2145        let project = self.project.clone();
2146        let action_log = self.action_log.clone();
2147        cx.spawn(async move |this, cx| {
2148            let load = project.update(cx, |project, cx| {
2149                let path = project
2150                    .project_path_for_absolute_path(&path, cx)
2151                    .context("invalid path")?;
2152                anyhow::Ok(project.open_buffer(path, cx))
2153            });
2154            let buffer = load??.await?;
2155            let snapshot = this.update(cx, |this, cx| {
2156                this.shared_buffers
2157                    .get(&buffer)
2158                    .cloned()
2159                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2160            })?;
2161            let edits = cx
2162                .background_executor()
2163                .spawn(async move {
2164                    let old_text = snapshot.text();
2165                    text_diff(old_text.as_str(), &content)
2166                        .into_iter()
2167                        .map(|(range, replacement)| {
2168                            (
2169                                snapshot.anchor_after(range.start)
2170                                    ..snapshot.anchor_before(range.end),
2171                                replacement,
2172                            )
2173                        })
2174                        .collect::<Vec<_>>()
2175                })
2176                .await;
2177
2178            project.update(cx, |project, cx| {
2179                project.set_agent_location(
2180                    Some(AgentLocation {
2181                        buffer: buffer.downgrade(),
2182                        position: edits
2183                            .last()
2184                            .map(|(range, _)| range.end)
2185                            .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2186                    }),
2187                    cx,
2188                );
2189            })?;
2190
2191            let format_on_save = cx.update(|cx| {
2192                action_log.update(cx, |action_log, cx| {
2193                    action_log.buffer_read(buffer.clone(), cx);
2194                });
2195
2196                let format_on_save = buffer.update(cx, |buffer, cx| {
2197                    buffer.edit(edits, None, cx);
2198
2199                    let settings = language::language_settings::language_settings(
2200                        buffer.language().map(|l| l.name()),
2201                        buffer.file(),
2202                        cx,
2203                    );
2204
2205                    settings.format_on_save != FormatOnSave::Off
2206                });
2207                action_log.update(cx, |action_log, cx| {
2208                    action_log.buffer_edited(buffer.clone(), cx);
2209                });
2210                format_on_save
2211            })?;
2212
2213            if format_on_save {
2214                let format_task = project.update(cx, |project, cx| {
2215                    project.format(
2216                        HashSet::from_iter([buffer.clone()]),
2217                        LspFormatTarget::Buffers,
2218                        false,
2219                        FormatTrigger::Save,
2220                        cx,
2221                    )
2222                })?;
2223                format_task.await.log_err();
2224
2225                action_log.update(cx, |action_log, cx| {
2226                    action_log.buffer_edited(buffer.clone(), cx);
2227                })?;
2228            }
2229
2230            project
2231                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2232                .await
2233        })
2234    }
2235
2236    pub fn create_terminal(
2237        &self,
2238        command: String,
2239        args: Vec<String>,
2240        extra_env: Vec<acp::EnvVariable>,
2241        cwd: Option<PathBuf>,
2242        output_byte_limit: Option<u64>,
2243        cx: &mut Context<Self>,
2244    ) -> Task<Result<Entity<Terminal>>> {
2245        let env = match &cwd {
2246            Some(dir) => self.project.update(cx, |project, cx| {
2247                project.environment().update(cx, |env, cx| {
2248                    env.directory_environment(dir.as_path().into(), cx)
2249                })
2250            }),
2251            None => Task::ready(None).shared(),
2252        };
2253        let env = cx.spawn(async move |_, _| {
2254            let mut env = env.await.unwrap_or_default();
2255            // Disables paging for `git` and hopefully other commands
2256            env.insert("PAGER".into(), "".into());
2257            for var in extra_env {
2258                env.insert(var.name, var.value);
2259            }
2260            env
2261        });
2262
2263        let project = self.project.clone();
2264        let language_registry = project.read(cx).languages().clone();
2265        let is_windows = project.read(cx).path_style(cx).is_windows();
2266
2267        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2268        let terminal_task = cx.spawn({
2269            let terminal_id = terminal_id.clone();
2270            async move |_this, cx| {
2271                let env = env.await;
2272                let shell = project
2273                    .update(cx, |project, cx| {
2274                        project
2275                            .remote_client()
2276                            .and_then(|r| r.read(cx).default_system_shell())
2277                    })?
2278                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2279                let (task_command, task_args) =
2280                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2281                        .redirect_stdin_to_dev_null()
2282                        .build(Some(command.clone()), &args);
2283                let terminal = project
2284                    .update(cx, |project, cx| {
2285                        project.create_terminal_task(
2286                            task::SpawnInTerminal {
2287                                command: Some(task_command),
2288                                args: task_args,
2289                                cwd: cwd.clone(),
2290                                env,
2291                                ..Default::default()
2292                            },
2293                            cx,
2294                        )
2295                    })?
2296                    .await?;
2297
2298                cx.new(|cx| {
2299                    Terminal::new(
2300                        terminal_id,
2301                        &format!("{} {}", command, args.join(" ")),
2302                        cwd,
2303                        output_byte_limit.map(|l| l as usize),
2304                        terminal,
2305                        language_registry,
2306                        cx,
2307                    )
2308                })
2309            }
2310        });
2311
2312        cx.spawn(async move |this, cx| {
2313            let terminal = terminal_task.await?;
2314            this.update(cx, |this, _cx| {
2315                this.terminals.insert(terminal_id, terminal.clone());
2316                terminal
2317            })
2318        })
2319    }
2320
2321    pub fn kill_terminal(
2322        &mut self,
2323        terminal_id: acp::TerminalId,
2324        cx: &mut Context<Self>,
2325    ) -> Result<()> {
2326        self.terminals
2327            .get(&terminal_id)
2328            .context("Terminal not found")?
2329            .update(cx, |terminal, cx| {
2330                terminal.kill(cx);
2331            });
2332
2333        Ok(())
2334    }
2335
2336    pub fn release_terminal(
2337        &mut self,
2338        terminal_id: acp::TerminalId,
2339        cx: &mut Context<Self>,
2340    ) -> Result<()> {
2341        self.terminals
2342            .remove(&terminal_id)
2343            .context("Terminal not found")?
2344            .update(cx, |terminal, cx| {
2345                terminal.kill(cx);
2346            });
2347
2348        Ok(())
2349    }
2350
2351    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2352        self.terminals
2353            .get(&terminal_id)
2354            .context("Terminal not found")
2355            .cloned()
2356    }
2357
2358    pub fn to_markdown(&self, cx: &App) -> String {
2359        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2360    }
2361
2362    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2363        cx.emit(AcpThreadEvent::LoadError(error));
2364    }
2365
2366    pub fn register_terminal_created(
2367        &mut self,
2368        terminal_id: acp::TerminalId,
2369        command_label: String,
2370        working_dir: Option<PathBuf>,
2371        output_byte_limit: Option<u64>,
2372        terminal: Entity<::terminal::Terminal>,
2373        cx: &mut Context<Self>,
2374    ) -> Entity<Terminal> {
2375        let language_registry = self.project.read(cx).languages().clone();
2376
2377        let entity = cx.new(|cx| {
2378            Terminal::new(
2379                terminal_id.clone(),
2380                &command_label,
2381                working_dir.clone(),
2382                output_byte_limit.map(|l| l as usize),
2383                terminal,
2384                language_registry,
2385                cx,
2386            )
2387        });
2388        self.terminals.insert(terminal_id.clone(), entity.clone());
2389        entity
2390    }
2391}
2392
2393fn markdown_for_raw_output(
2394    raw_output: &serde_json::Value,
2395    language_registry: &Arc<LanguageRegistry>,
2396    cx: &mut App,
2397) -> Option<Entity<Markdown>> {
2398    match raw_output {
2399        serde_json::Value::Null => None,
2400        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2401            Markdown::new(
2402                value.to_string().into(),
2403                Some(language_registry.clone()),
2404                None,
2405                cx,
2406            )
2407        })),
2408        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2409            Markdown::new(
2410                value.to_string().into(),
2411                Some(language_registry.clone()),
2412                None,
2413                cx,
2414            )
2415        })),
2416        serde_json::Value::String(value) => Some(cx.new(|cx| {
2417            Markdown::new(
2418                value.clone().into(),
2419                Some(language_registry.clone()),
2420                None,
2421                cx,
2422            )
2423        })),
2424        value => Some(cx.new(|cx| {
2425            Markdown::new(
2426                format!("```json\n{}\n```", value).into(),
2427                Some(language_registry.clone()),
2428                None,
2429                cx,
2430            )
2431        })),
2432    }
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437    use super::*;
2438    use anyhow::anyhow;
2439    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2440    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2441    use indoc::indoc;
2442    use project::{FakeFs, Fs};
2443    use rand::{distr, prelude::*};
2444    use serde_json::json;
2445    use settings::SettingsStore;
2446    use smol::stream::StreamExt as _;
2447    use std::{
2448        any::Any,
2449        cell::RefCell,
2450        path::Path,
2451        rc::Rc,
2452        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2453        time::Duration,
2454    };
2455    use util::path;
2456
2457    fn init_test(cx: &mut TestAppContext) {
2458        env_logger::try_init().ok();
2459        cx.update(|cx| {
2460            let settings_store = SettingsStore::test(cx);
2461            cx.set_global(settings_store);
2462        });
2463    }
2464
2465    #[gpui::test]
2466    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2467        init_test(cx);
2468
2469        let fs = FakeFs::new(cx.executor());
2470        let project = Project::test(fs, [], cx).await;
2471        let connection = Rc::new(FakeAgentConnection::new());
2472        let thread = cx
2473            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2474            .await
2475            .unwrap();
2476
2477        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2478
2479        // Send Output BEFORE Created - should be buffered by acp_thread
2480        thread.update(cx, |thread, cx| {
2481            thread.on_terminal_provider_event(
2482                TerminalProviderEvent::Output {
2483                    terminal_id: terminal_id.clone(),
2484                    data: b"hello buffered".to_vec(),
2485                },
2486                cx,
2487            );
2488        });
2489
2490        // Create a display-only terminal and then send Created
2491        let lower = cx.new(|cx| {
2492            let builder = ::terminal::TerminalBuilder::new_display_only(
2493                ::terminal::terminal_settings::CursorShape::default(),
2494                ::terminal::terminal_settings::AlternateScroll::On,
2495                None,
2496                0,
2497            )
2498            .unwrap();
2499            builder.subscribe(cx)
2500        });
2501
2502        thread.update(cx, |thread, cx| {
2503            thread.on_terminal_provider_event(
2504                TerminalProviderEvent::Created {
2505                    terminal_id: terminal_id.clone(),
2506                    label: "Buffered Test".to_string(),
2507                    cwd: None,
2508                    output_byte_limit: None,
2509                    terminal: lower.clone(),
2510                },
2511                cx,
2512            );
2513        });
2514
2515        // After Created, buffered Output should have been flushed into the renderer
2516        let content = thread.read_with(cx, |thread, cx| {
2517            let term = thread.terminal(terminal_id.clone()).unwrap();
2518            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2519        });
2520
2521        assert!(
2522            content.contains("hello buffered"),
2523            "expected buffered output to render, got: {content}"
2524        );
2525    }
2526
2527    #[gpui::test]
2528    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2529        init_test(cx);
2530
2531        let fs = FakeFs::new(cx.executor());
2532        let project = Project::test(fs, [], cx).await;
2533        let connection = Rc::new(FakeAgentConnection::new());
2534        let thread = cx
2535            .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2536            .await
2537            .unwrap();
2538
2539        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2540
2541        // Send Output BEFORE Created
2542        thread.update(cx, |thread, cx| {
2543            thread.on_terminal_provider_event(
2544                TerminalProviderEvent::Output {
2545                    terminal_id: terminal_id.clone(),
2546                    data: b"pre-exit data".to_vec(),
2547                },
2548                cx,
2549            );
2550        });
2551
2552        // Send Exit BEFORE Created
2553        thread.update(cx, |thread, cx| {
2554            thread.on_terminal_provider_event(
2555                TerminalProviderEvent::Exit {
2556                    terminal_id: terminal_id.clone(),
2557                    status: acp::TerminalExitStatus::new().exit_code(0),
2558                },
2559                cx,
2560            );
2561        });
2562
2563        // Now create a display-only lower-level terminal and send Created
2564        let lower = cx.new(|cx| {
2565            let builder = ::terminal::TerminalBuilder::new_display_only(
2566                ::terminal::terminal_settings::CursorShape::default(),
2567                ::terminal::terminal_settings::AlternateScroll::On,
2568                None,
2569                0,
2570            )
2571            .unwrap();
2572            builder.subscribe(cx)
2573        });
2574
2575        thread.update(cx, |thread, cx| {
2576            thread.on_terminal_provider_event(
2577                TerminalProviderEvent::Created {
2578                    terminal_id: terminal_id.clone(),
2579                    label: "Buffered Exit Test".to_string(),
2580                    cwd: None,
2581                    output_byte_limit: None,
2582                    terminal: lower.clone(),
2583                },
2584                cx,
2585            );
2586        });
2587
2588        // Output should be present after Created (flushed from buffer)
2589        let content = thread.read_with(cx, |thread, cx| {
2590            let term = thread.terminal(terminal_id.clone()).unwrap();
2591            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2592        });
2593
2594        assert!(
2595            content.contains("pre-exit data"),
2596            "expected pre-exit data to render, got: {content}"
2597        );
2598    }
2599
2600    #[gpui::test]
2601    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2602        init_test(cx);
2603
2604        let fs = FakeFs::new(cx.executor());
2605        let project = Project::test(fs, [], cx).await;
2606        let connection = Rc::new(FakeAgentConnection::new());
2607        let thread = cx
2608            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2609            .await
2610            .unwrap();
2611
2612        // Test creating a new user message
2613        thread.update(cx, |thread, cx| {
2614            thread.push_user_content_block(None, "Hello, ".into(), cx);
2615        });
2616
2617        thread.update(cx, |thread, cx| {
2618            assert_eq!(thread.entries.len(), 1);
2619            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2620                assert_eq!(user_msg.id, None);
2621                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2622            } else {
2623                panic!("Expected UserMessage");
2624            }
2625        });
2626
2627        // Test appending to existing user message
2628        let message_1_id = UserMessageId::new();
2629        thread.update(cx, |thread, cx| {
2630            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2631        });
2632
2633        thread.update(cx, |thread, cx| {
2634            assert_eq!(thread.entries.len(), 1);
2635            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2636                assert_eq!(user_msg.id, Some(message_1_id));
2637                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2638            } else {
2639                panic!("Expected UserMessage");
2640            }
2641        });
2642
2643        // Test creating new user message after assistant message
2644        thread.update(cx, |thread, cx| {
2645            thread.push_assistant_content_block("Assistant response".into(), false, cx);
2646        });
2647
2648        let message_2_id = UserMessageId::new();
2649        thread.update(cx, |thread, cx| {
2650            thread.push_user_content_block(
2651                Some(message_2_id.clone()),
2652                "New user message".into(),
2653                cx,
2654            );
2655        });
2656
2657        thread.update(cx, |thread, cx| {
2658            assert_eq!(thread.entries.len(), 3);
2659            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2660                assert_eq!(user_msg.id, Some(message_2_id));
2661                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2662            } else {
2663                panic!("Expected UserMessage at index 2");
2664            }
2665        });
2666    }
2667
2668    #[gpui::test]
2669    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2670        init_test(cx);
2671
2672        let fs = FakeFs::new(cx.executor());
2673        let project = Project::test(fs, [], cx).await;
2674        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2675            |_, thread, mut cx| {
2676                async move {
2677                    thread.update(&mut cx, |thread, cx| {
2678                        thread
2679                            .handle_session_update(
2680                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2681                                    "Thinking ".into(),
2682                                )),
2683                                cx,
2684                            )
2685                            .unwrap();
2686                        thread
2687                            .handle_session_update(
2688                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2689                                    "hard!".into(),
2690                                )),
2691                                cx,
2692                            )
2693                            .unwrap();
2694                    })?;
2695                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2696                }
2697                .boxed_local()
2698            },
2699        ));
2700
2701        let thread = cx
2702            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2703            .await
2704            .unwrap();
2705
2706        thread
2707            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2708            .await
2709            .unwrap();
2710
2711        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2712        assert_eq!(
2713            output,
2714            indoc! {r#"
2715            ## User
2716
2717            Hello from Zed!
2718
2719            ## Assistant
2720
2721            <thinking>
2722            Thinking hard!
2723            </thinking>
2724
2725            "#}
2726        );
2727    }
2728
2729    #[gpui::test]
2730    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2731        init_test(cx);
2732
2733        let fs = FakeFs::new(cx.executor());
2734        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2735            .await;
2736        let project = Project::test(fs.clone(), [], cx).await;
2737        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2738        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2739        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2740            move |_, thread, mut cx| {
2741                let read_file_tx = read_file_tx.clone();
2742                async move {
2743                    let content = thread
2744                        .update(&mut cx, |thread, cx| {
2745                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2746                        })
2747                        .unwrap()
2748                        .await
2749                        .unwrap();
2750                    assert_eq!(content, "one\ntwo\nthree\n");
2751                    read_file_tx.take().unwrap().send(()).unwrap();
2752                    thread
2753                        .update(&mut cx, |thread, cx| {
2754                            thread.write_text_file(
2755                                path!("/tmp/foo").into(),
2756                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
2757                                cx,
2758                            )
2759                        })
2760                        .unwrap()
2761                        .await
2762                        .unwrap();
2763                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2764                }
2765                .boxed_local()
2766            },
2767        ));
2768
2769        let (worktree, pathbuf) = project
2770            .update(cx, |project, cx| {
2771                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2772            })
2773            .await
2774            .unwrap();
2775        let buffer = project
2776            .update(cx, |project, cx| {
2777                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2778            })
2779            .await
2780            .unwrap();
2781
2782        let thread = cx
2783            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2784            .await
2785            .unwrap();
2786
2787        let request = thread.update(cx, |thread, cx| {
2788            thread.send_raw("Extend the count in /tmp/foo", cx)
2789        });
2790        read_file_rx.await.ok();
2791        buffer.update(cx, |buffer, cx| {
2792            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2793        });
2794        cx.run_until_parked();
2795        assert_eq!(
2796            buffer.read_with(cx, |buffer, _| buffer.text()),
2797            "zero\none\ntwo\nthree\nfour\nfive\n"
2798        );
2799        assert_eq!(
2800            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2801            "zero\none\ntwo\nthree\nfour\nfive\n"
2802        );
2803        request.await.unwrap();
2804    }
2805
2806    #[gpui::test]
2807    async fn test_reading_from_line(cx: &mut TestAppContext) {
2808        init_test(cx);
2809
2810        let fs = FakeFs::new(cx.executor());
2811        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2812            .await;
2813        let project = Project::test(fs.clone(), [], cx).await;
2814        project
2815            .update(cx, |project, cx| {
2816                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2817            })
2818            .await
2819            .unwrap();
2820
2821        let connection = Rc::new(FakeAgentConnection::new());
2822
2823        let thread = cx
2824            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2825            .await
2826            .unwrap();
2827
2828        // Whole file
2829        let content = thread
2830            .update(cx, |thread, cx| {
2831                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2832            })
2833            .await
2834            .unwrap();
2835
2836        assert_eq!(content, "one\ntwo\nthree\nfour\n");
2837
2838        // Only start line
2839        let content = thread
2840            .update(cx, |thread, cx| {
2841                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2842            })
2843            .await
2844            .unwrap();
2845
2846        assert_eq!(content, "three\nfour\n");
2847
2848        // Only limit
2849        let content = thread
2850            .update(cx, |thread, cx| {
2851                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2852            })
2853            .await
2854            .unwrap();
2855
2856        assert_eq!(content, "one\ntwo\n");
2857
2858        // Range
2859        let content = thread
2860            .update(cx, |thread, cx| {
2861                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2862            })
2863            .await
2864            .unwrap();
2865
2866        assert_eq!(content, "two\nthree\n");
2867
2868        // Invalid
2869        let err = thread
2870            .update(cx, |thread, cx| {
2871                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2872            })
2873            .await
2874            .unwrap_err();
2875
2876        assert_eq!(
2877            err.to_string(),
2878            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2879        );
2880    }
2881
2882    #[gpui::test]
2883    async fn test_reading_empty_file(cx: &mut TestAppContext) {
2884        init_test(cx);
2885
2886        let fs = FakeFs::new(cx.executor());
2887        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2888        let project = Project::test(fs.clone(), [], cx).await;
2889        project
2890            .update(cx, |project, cx| {
2891                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2892            })
2893            .await
2894            .unwrap();
2895
2896        let connection = Rc::new(FakeAgentConnection::new());
2897
2898        let thread = cx
2899            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2900            .await
2901            .unwrap();
2902
2903        // Whole file
2904        let content = thread
2905            .update(cx, |thread, cx| {
2906                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2907            })
2908            .await
2909            .unwrap();
2910
2911        assert_eq!(content, "");
2912
2913        // Only start line
2914        let content = thread
2915            .update(cx, |thread, cx| {
2916                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2917            })
2918            .await
2919            .unwrap();
2920
2921        assert_eq!(content, "");
2922
2923        // Only limit
2924        let content = thread
2925            .update(cx, |thread, cx| {
2926                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2927            })
2928            .await
2929            .unwrap();
2930
2931        assert_eq!(content, "");
2932
2933        // Range
2934        let content = thread
2935            .update(cx, |thread, cx| {
2936                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2937            })
2938            .await
2939            .unwrap();
2940
2941        assert_eq!(content, "");
2942
2943        // Invalid
2944        let err = thread
2945            .update(cx, |thread, cx| {
2946                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2947            })
2948            .await
2949            .unwrap_err();
2950
2951        assert_eq!(
2952            err.to_string(),
2953            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2954        );
2955    }
2956    #[gpui::test]
2957    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2958        init_test(cx);
2959
2960        let fs = FakeFs::new(cx.executor());
2961        fs.insert_tree(path!("/tmp"), json!({})).await;
2962        let project = Project::test(fs.clone(), [], cx).await;
2963        project
2964            .update(cx, |project, cx| {
2965                project.find_or_create_worktree(path!("/tmp"), true, cx)
2966            })
2967            .await
2968            .unwrap();
2969
2970        let connection = Rc::new(FakeAgentConnection::new());
2971
2972        let thread = cx
2973            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2974            .await
2975            .unwrap();
2976
2977        // Out of project file
2978        let err = thread
2979            .update(cx, |thread, cx| {
2980                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2981            })
2982            .await
2983            .unwrap_err();
2984
2985        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
2986    }
2987
2988    #[gpui::test]
2989    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2990        init_test(cx);
2991
2992        let fs = FakeFs::new(cx.executor());
2993        let project = Project::test(fs, [], cx).await;
2994        let id = acp::ToolCallId::new("test");
2995
2996        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2997            let id = id.clone();
2998            move |_, thread, mut cx| {
2999                let id = id.clone();
3000                async move {
3001                    thread
3002                        .update(&mut cx, |thread, cx| {
3003                            thread.handle_session_update(
3004                                acp::SessionUpdate::ToolCall(
3005                                    acp::ToolCall::new(id.clone(), "Label")
3006                                        .kind(acp::ToolKind::Fetch)
3007                                        .status(acp::ToolCallStatus::InProgress),
3008                                ),
3009                                cx,
3010                            )
3011                        })
3012                        .unwrap()
3013                        .unwrap();
3014                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3015                }
3016                .boxed_local()
3017            }
3018        }));
3019
3020        let thread = cx
3021            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3022            .await
3023            .unwrap();
3024
3025        let request = thread.update(cx, |thread, cx| {
3026            thread.send_raw("Fetch https://example.com", cx)
3027        });
3028
3029        run_until_first_tool_call(&thread, cx).await;
3030
3031        thread.read_with(cx, |thread, _| {
3032            assert!(matches!(
3033                thread.entries[1],
3034                AgentThreadEntry::ToolCall(ToolCall {
3035                    status: ToolCallStatus::InProgress,
3036                    ..
3037                })
3038            ));
3039        });
3040
3041        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3042
3043        thread.read_with(cx, |thread, _| {
3044            assert!(matches!(
3045                &thread.entries[1],
3046                AgentThreadEntry::ToolCall(ToolCall {
3047                    status: ToolCallStatus::Canceled,
3048                    ..
3049                })
3050            ));
3051        });
3052
3053        thread
3054            .update(cx, |thread, cx| {
3055                thread.handle_session_update(
3056                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3057                        id,
3058                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3059                    )),
3060                    cx,
3061                )
3062            })
3063            .unwrap();
3064
3065        request.await.unwrap();
3066
3067        thread.read_with(cx, |thread, _| {
3068            assert!(matches!(
3069                thread.entries[1],
3070                AgentThreadEntry::ToolCall(ToolCall {
3071                    status: ToolCallStatus::Completed,
3072                    ..
3073                })
3074            ));
3075        });
3076    }
3077
3078    #[gpui::test]
3079    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3080        init_test(cx);
3081        let fs = FakeFs::new(cx.background_executor.clone());
3082        fs.insert_tree(path!("/test"), json!({})).await;
3083        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3084
3085        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3086            move |_, thread, mut cx| {
3087                async move {
3088                    thread
3089                        .update(&mut cx, |thread, cx| {
3090                            thread.handle_session_update(
3091                                acp::SessionUpdate::ToolCall(
3092                                    acp::ToolCall::new("test", "Label")
3093                                        .kind(acp::ToolKind::Edit)
3094                                        .status(acp::ToolCallStatus::Completed)
3095                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3096                                            "/test/test.txt",
3097                                            "foo",
3098                                        ))]),
3099                                ),
3100                                cx,
3101                            )
3102                        })
3103                        .unwrap()
3104                        .unwrap();
3105                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3106                }
3107                .boxed_local()
3108            }
3109        }));
3110
3111        let thread = cx
3112            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3113            .await
3114            .unwrap();
3115
3116        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3117            .await
3118            .unwrap();
3119
3120        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3121    }
3122
3123    #[gpui::test(iterations = 10)]
3124    async fn test_checkpoints(cx: &mut TestAppContext) {
3125        init_test(cx);
3126        let fs = FakeFs::new(cx.background_executor.clone());
3127        fs.insert_tree(
3128            path!("/test"),
3129            json!({
3130                ".git": {}
3131            }),
3132        )
3133        .await;
3134        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3135
3136        let simulate_changes = Arc::new(AtomicBool::new(true));
3137        let next_filename = Arc::new(AtomicUsize::new(0));
3138        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3139            let simulate_changes = simulate_changes.clone();
3140            let next_filename = next_filename.clone();
3141            let fs = fs.clone();
3142            move |request, thread, mut cx| {
3143                let fs = fs.clone();
3144                let simulate_changes = simulate_changes.clone();
3145                let next_filename = next_filename.clone();
3146                async move {
3147                    if simulate_changes.load(SeqCst) {
3148                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3149                        fs.write(Path::new(&filename), b"").await?;
3150                    }
3151
3152                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3153                        panic!("expected text content block");
3154                    };
3155                    thread.update(&mut cx, |thread, cx| {
3156                        thread
3157                            .handle_session_update(
3158                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3159                                    content.text.to_uppercase().into(),
3160                                )),
3161                                cx,
3162                            )
3163                            .unwrap();
3164                    })?;
3165                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3166                }
3167                .boxed_local()
3168            }
3169        }));
3170        let thread = cx
3171            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3172            .await
3173            .unwrap();
3174
3175        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3176            .await
3177            .unwrap();
3178        thread.read_with(cx, |thread, cx| {
3179            assert_eq!(
3180                thread.to_markdown(cx),
3181                indoc! {"
3182                    ## User (checkpoint)
3183
3184                    Lorem
3185
3186                    ## Assistant
3187
3188                    LOREM
3189
3190                "}
3191            );
3192        });
3193        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3194
3195        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3196            .await
3197            .unwrap();
3198        thread.read_with(cx, |thread, cx| {
3199            assert_eq!(
3200                thread.to_markdown(cx),
3201                indoc! {"
3202                    ## User (checkpoint)
3203
3204                    Lorem
3205
3206                    ## Assistant
3207
3208                    LOREM
3209
3210                    ## User (checkpoint)
3211
3212                    ipsum
3213
3214                    ## Assistant
3215
3216                    IPSUM
3217
3218                "}
3219            );
3220        });
3221        assert_eq!(
3222            fs.files(),
3223            vec![
3224                Path::new(path!("/test/file-0")),
3225                Path::new(path!("/test/file-1"))
3226            ]
3227        );
3228
3229        // Checkpoint isn't stored when there are no changes.
3230        simulate_changes.store(false, SeqCst);
3231        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3232            .await
3233            .unwrap();
3234        thread.read_with(cx, |thread, cx| {
3235            assert_eq!(
3236                thread.to_markdown(cx),
3237                indoc! {"
3238                    ## User (checkpoint)
3239
3240                    Lorem
3241
3242                    ## Assistant
3243
3244                    LOREM
3245
3246                    ## User (checkpoint)
3247
3248                    ipsum
3249
3250                    ## Assistant
3251
3252                    IPSUM
3253
3254                    ## User
3255
3256                    dolor
3257
3258                    ## Assistant
3259
3260                    DOLOR
3261
3262                "}
3263            );
3264        });
3265        assert_eq!(
3266            fs.files(),
3267            vec![
3268                Path::new(path!("/test/file-0")),
3269                Path::new(path!("/test/file-1"))
3270            ]
3271        );
3272
3273        // Rewinding the conversation truncates the history and restores the checkpoint.
3274        thread
3275            .update(cx, |thread, cx| {
3276                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3277                    panic!("unexpected entries {:?}", thread.entries)
3278                };
3279                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3280            })
3281            .await
3282            .unwrap();
3283        thread.read_with(cx, |thread, cx| {
3284            assert_eq!(
3285                thread.to_markdown(cx),
3286                indoc! {"
3287                    ## User (checkpoint)
3288
3289                    Lorem
3290
3291                    ## Assistant
3292
3293                    LOREM
3294
3295                "}
3296            );
3297        });
3298        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3299    }
3300
3301    #[gpui::test]
3302    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3303        use std::sync::atomic::AtomicUsize;
3304        init_test(cx);
3305
3306        let fs = FakeFs::new(cx.executor());
3307        let project = Project::test(fs, None, cx).await;
3308
3309        // Create a connection that simulates refusal after tool result
3310        let prompt_count = Arc::new(AtomicUsize::new(0));
3311        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3312            let prompt_count = prompt_count.clone();
3313            move |_request, thread, mut cx| {
3314                let count = prompt_count.fetch_add(1, SeqCst);
3315                async move {
3316                    if count == 0 {
3317                        // First prompt: Generate a tool call with result
3318                        thread.update(&mut cx, |thread, cx| {
3319                            thread
3320                                .handle_session_update(
3321                                    acp::SessionUpdate::ToolCall(
3322                                        acp::ToolCall::new("tool1", "Test Tool")
3323                                            .kind(acp::ToolKind::Fetch)
3324                                            .status(acp::ToolCallStatus::Completed)
3325                                            .raw_input(serde_json::json!({"query": "test"}))
3326                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
3327                                    ),
3328                                    cx,
3329                                )
3330                                .unwrap();
3331                        })?;
3332
3333                        // Now return refusal because of the tool result
3334                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3335                    } else {
3336                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3337                    }
3338                }
3339                .boxed_local()
3340            }
3341        }));
3342
3343        let thread = cx
3344            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3345            .await
3346            .unwrap();
3347
3348        // Track if we see a Refusal event
3349        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3350        let saw_refusal_event_captured = saw_refusal_event.clone();
3351        thread.update(cx, |_thread, cx| {
3352            cx.subscribe(
3353                &thread,
3354                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3355                    if matches!(event, AcpThreadEvent::Refusal) {
3356                        *saw_refusal_event_captured.lock().unwrap() = true;
3357                    }
3358                },
3359            )
3360            .detach();
3361        });
3362
3363        // Send a user message - this will trigger tool call and then refusal
3364        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3365        cx.background_executor.spawn(send_task).detach();
3366        cx.run_until_parked();
3367
3368        // Verify that:
3369        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3370        // 2. The user message was NOT truncated
3371        assert!(
3372            *saw_refusal_event.lock().unwrap(),
3373            "Refusal event should be emitted for tool result refusals"
3374        );
3375
3376        thread.read_with(cx, |thread, _| {
3377            let entries = thread.entries();
3378            assert!(entries.len() >= 2, "Should have user message and tool call");
3379
3380            // Verify user message is still there
3381            assert!(
3382                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3383                "User message should not be truncated"
3384            );
3385
3386            // Verify tool call is there with result
3387            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3388                assert!(
3389                    tool_call.raw_output.is_some(),
3390                    "Tool call should have output"
3391                );
3392            } else {
3393                panic!("Expected tool call at index 1");
3394            }
3395        });
3396    }
3397
3398    #[gpui::test]
3399    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3400        init_test(cx);
3401
3402        let fs = FakeFs::new(cx.executor());
3403        let project = Project::test(fs, None, cx).await;
3404
3405        let refuse_next = Arc::new(AtomicBool::new(false));
3406        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3407            let refuse_next = refuse_next.clone();
3408            move |_request, _thread, _cx| {
3409                if refuse_next.load(SeqCst) {
3410                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3411                        .boxed_local()
3412                } else {
3413                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3414                        .boxed_local()
3415                }
3416            }
3417        }));
3418
3419        let thread = cx
3420            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3421            .await
3422            .unwrap();
3423
3424        // Track if we see a Refusal event
3425        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3426        let saw_refusal_event_captured = saw_refusal_event.clone();
3427        thread.update(cx, |_thread, cx| {
3428            cx.subscribe(
3429                &thread,
3430                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3431                    if matches!(event, AcpThreadEvent::Refusal) {
3432                        *saw_refusal_event_captured.lock().unwrap() = true;
3433                    }
3434                },
3435            )
3436            .detach();
3437        });
3438
3439        // Send a message that will be refused
3440        refuse_next.store(true, SeqCst);
3441        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3442            .await
3443            .unwrap();
3444
3445        // Verify that a Refusal event WAS emitted for user prompt refusal
3446        assert!(
3447            *saw_refusal_event.lock().unwrap(),
3448            "Refusal event should be emitted for user prompt refusals"
3449        );
3450
3451        // Verify the message was truncated (user prompt refusal)
3452        thread.read_with(cx, |thread, cx| {
3453            assert_eq!(thread.to_markdown(cx), "");
3454        });
3455    }
3456
3457    #[gpui::test]
3458    async fn test_refusal(cx: &mut TestAppContext) {
3459        init_test(cx);
3460        let fs = FakeFs::new(cx.background_executor.clone());
3461        fs.insert_tree(path!("/"), json!({})).await;
3462        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3463
3464        let refuse_next = Arc::new(AtomicBool::new(false));
3465        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3466            let refuse_next = refuse_next.clone();
3467            move |request, thread, mut cx| {
3468                let refuse_next = refuse_next.clone();
3469                async move {
3470                    if refuse_next.load(SeqCst) {
3471                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3472                    }
3473
3474                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3475                        panic!("expected text content block");
3476                    };
3477                    thread.update(&mut cx, |thread, cx| {
3478                        thread
3479                            .handle_session_update(
3480                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3481                                    content.text.to_uppercase().into(),
3482                                )),
3483                                cx,
3484                            )
3485                            .unwrap();
3486                    })?;
3487                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3488                }
3489                .boxed_local()
3490            }
3491        }));
3492        let thread = cx
3493            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3494            .await
3495            .unwrap();
3496
3497        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3498            .await
3499            .unwrap();
3500        thread.read_with(cx, |thread, cx| {
3501            assert_eq!(
3502                thread.to_markdown(cx),
3503                indoc! {"
3504                    ## User
3505
3506                    hello
3507
3508                    ## Assistant
3509
3510                    HELLO
3511
3512                "}
3513            );
3514        });
3515
3516        // Simulate refusing the second message. The message should be truncated
3517        // when a user prompt is refused.
3518        refuse_next.store(true, SeqCst);
3519        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3520            .await
3521            .unwrap();
3522        thread.read_with(cx, |thread, cx| {
3523            assert_eq!(
3524                thread.to_markdown(cx),
3525                indoc! {"
3526                    ## User
3527
3528                    hello
3529
3530                    ## Assistant
3531
3532                    HELLO
3533
3534                "}
3535            );
3536        });
3537    }
3538
3539    async fn run_until_first_tool_call(
3540        thread: &Entity<AcpThread>,
3541        cx: &mut TestAppContext,
3542    ) -> usize {
3543        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3544
3545        let subscription = cx.update(|cx| {
3546            cx.subscribe(thread, move |thread, _, cx| {
3547                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3548                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3549                        return tx.try_send(ix).unwrap();
3550                    }
3551                }
3552            })
3553        });
3554
3555        select! {
3556            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3557                panic!("Timeout waiting for tool call")
3558            }
3559            ix = rx.next().fuse() => {
3560                drop(subscription);
3561                ix.unwrap()
3562            }
3563        }
3564    }
3565
3566    #[derive(Clone, Default)]
3567    struct FakeAgentConnection {
3568        auth_methods: Vec<acp::AuthMethod>,
3569        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3570        on_user_message: Option<
3571            Rc<
3572                dyn Fn(
3573                        acp::PromptRequest,
3574                        WeakEntity<AcpThread>,
3575                        AsyncApp,
3576                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3577                    + 'static,
3578            >,
3579        >,
3580    }
3581
3582    impl FakeAgentConnection {
3583        fn new() -> Self {
3584            Self {
3585                auth_methods: Vec::new(),
3586                on_user_message: None,
3587                sessions: Arc::default(),
3588            }
3589        }
3590
3591        #[expect(unused)]
3592        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3593            self.auth_methods = auth_methods;
3594            self
3595        }
3596
3597        fn on_user_message(
3598            mut self,
3599            handler: impl Fn(
3600                acp::PromptRequest,
3601                WeakEntity<AcpThread>,
3602                AsyncApp,
3603            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3604            + 'static,
3605        ) -> Self {
3606            self.on_user_message.replace(Rc::new(handler));
3607            self
3608        }
3609    }
3610
3611    impl AgentConnection for FakeAgentConnection {
3612        fn telemetry_id(&self) -> SharedString {
3613            "fake".into()
3614        }
3615
3616        fn auth_methods(&self) -> &[acp::AuthMethod] {
3617            &self.auth_methods
3618        }
3619
3620        fn new_thread(
3621            self: Rc<Self>,
3622            project: Entity<Project>,
3623            _cwd: &Path,
3624            cx: &mut App,
3625        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3626            let session_id = acp::SessionId::new(
3627                rand::rng()
3628                    .sample_iter(&distr::Alphanumeric)
3629                    .take(7)
3630                    .map(char::from)
3631                    .collect::<String>(),
3632            );
3633            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3634            let thread = cx.new(|cx| {
3635                AcpThread::new(
3636                    "Test",
3637                    self.clone(),
3638                    project,
3639                    action_log,
3640                    session_id.clone(),
3641                    watch::Receiver::constant(
3642                        acp::PromptCapabilities::new()
3643                            .image(true)
3644                            .audio(true)
3645                            .embedded_context(true),
3646                    ),
3647                    cx,
3648                )
3649            });
3650            self.sessions.lock().insert(session_id, thread.downgrade());
3651            Task::ready(Ok(thread))
3652        }
3653
3654        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3655            if self.auth_methods().iter().any(|m| m.id == method) {
3656                Task::ready(Ok(()))
3657            } else {
3658                Task::ready(Err(anyhow!("Invalid Auth Method")))
3659            }
3660        }
3661
3662        fn prompt(
3663            &self,
3664            _id: Option<UserMessageId>,
3665            params: acp::PromptRequest,
3666            cx: &mut App,
3667        ) -> Task<gpui::Result<acp::PromptResponse>> {
3668            let sessions = self.sessions.lock();
3669            let thread = sessions.get(&params.session_id).unwrap();
3670            if let Some(handler) = &self.on_user_message {
3671                let handler = handler.clone();
3672                let thread = thread.clone();
3673                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3674            } else {
3675                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3676            }
3677        }
3678
3679        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3680            let sessions = self.sessions.lock();
3681            let thread = sessions.get(session_id).unwrap().clone();
3682
3683            cx.spawn(async move |cx| {
3684                thread
3685                    .update(cx, |thread, cx| thread.cancel(cx))
3686                    .unwrap()
3687                    .await
3688            })
3689            .detach();
3690        }
3691
3692        fn truncate(
3693            &self,
3694            session_id: &acp::SessionId,
3695            _cx: &App,
3696        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3697            Some(Rc::new(FakeAgentSessionEditor {
3698                _session_id: session_id.clone(),
3699            }))
3700        }
3701
3702        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3703            self
3704        }
3705    }
3706
3707    struct FakeAgentSessionEditor {
3708        _session_id: acp::SessionId,
3709    }
3710
3711    impl AgentSessionTruncate for FakeAgentSessionEditor {
3712        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3713            Task::ready(Ok(()))
3714        }
3715    }
3716
3717    #[gpui::test]
3718    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3719        init_test(cx);
3720
3721        let fs = FakeFs::new(cx.executor());
3722        let project = Project::test(fs, [], cx).await;
3723        let connection = Rc::new(FakeAgentConnection::new());
3724        let thread = cx
3725            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3726            .await
3727            .unwrap();
3728
3729        // Try to update a tool call that doesn't exist
3730        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
3731        thread.update(cx, |thread, cx| {
3732            let result = thread.handle_session_update(
3733                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3734                    nonexistent_id.clone(),
3735                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3736                )),
3737                cx,
3738            );
3739
3740            // The update should succeed (not return an error)
3741            assert!(result.is_ok());
3742
3743            // There should now be exactly one entry in the thread
3744            assert_eq!(thread.entries.len(), 1);
3745
3746            // The entry should be a failed tool call
3747            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3748                assert_eq!(tool_call.id, nonexistent_id);
3749                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3750                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3751
3752                // Check that the content contains the error message
3753                assert_eq!(tool_call.content.len(), 1);
3754                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3755                    match content_block {
3756                        ContentBlock::Markdown { markdown } => {
3757                            let markdown_text = markdown.read(cx).source();
3758                            assert!(markdown_text.contains("Tool call not found"));
3759                        }
3760                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3761                        ContentBlock::ResourceLink { .. } => {
3762                            panic!("Expected markdown content, got resource link")
3763                        }
3764                    }
3765                } else {
3766                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3767                }
3768            } else {
3769                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3770            }
3771        });
3772    }
3773
3774    /// Tests that restoring a checkpoint properly cleans up terminals that were
3775    /// created after that checkpoint, and cancels any in-progress generation.
3776    ///
3777    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
3778    /// that were started after that checkpoint should be terminated, and any in-progress
3779    /// AI generation should be canceled.
3780    #[gpui::test]
3781    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
3782        init_test(cx);
3783
3784        let fs = FakeFs::new(cx.executor());
3785        let project = Project::test(fs, [], cx).await;
3786        let connection = Rc::new(FakeAgentConnection::new());
3787        let thread = cx
3788            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3789            .await
3790            .unwrap();
3791
3792        // Send first user message to create a checkpoint
3793        cx.update(|cx| {
3794            thread.update(cx, |thread, cx| {
3795                thread.send(vec!["first message".into()], cx)
3796            })
3797        })
3798        .await
3799        .unwrap();
3800
3801        // Send second message (creates another checkpoint) - we'll restore to this one
3802        cx.update(|cx| {
3803            thread.update(cx, |thread, cx| {
3804                thread.send(vec!["second message".into()], cx)
3805            })
3806        })
3807        .await
3808        .unwrap();
3809
3810        // Create 2 terminals BEFORE the checkpoint that have completed running
3811        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3812        let mock_terminal_1 = cx.new(|cx| {
3813            let builder = ::terminal::TerminalBuilder::new_display_only(
3814                ::terminal::terminal_settings::CursorShape::default(),
3815                ::terminal::terminal_settings::AlternateScroll::On,
3816                None,
3817                0,
3818            )
3819            .unwrap();
3820            builder.subscribe(cx)
3821        });
3822
3823        thread.update(cx, |thread, cx| {
3824            thread.on_terminal_provider_event(
3825                TerminalProviderEvent::Created {
3826                    terminal_id: terminal_id_1.clone(),
3827                    label: "echo 'first'".to_string(),
3828                    cwd: Some(PathBuf::from("/test")),
3829                    output_byte_limit: None,
3830                    terminal: mock_terminal_1.clone(),
3831                },
3832                cx,
3833            );
3834        });
3835
3836        thread.update(cx, |thread, cx| {
3837            thread.on_terminal_provider_event(
3838                TerminalProviderEvent::Output {
3839                    terminal_id: terminal_id_1.clone(),
3840                    data: b"first\n".to_vec(),
3841                },
3842                cx,
3843            );
3844        });
3845
3846        thread.update(cx, |thread, cx| {
3847            thread.on_terminal_provider_event(
3848                TerminalProviderEvent::Exit {
3849                    terminal_id: terminal_id_1.clone(),
3850                    status: acp::TerminalExitStatus::new().exit_code(0),
3851                },
3852                cx,
3853            );
3854        });
3855
3856        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3857        let mock_terminal_2 = cx.new(|cx| {
3858            let builder = ::terminal::TerminalBuilder::new_display_only(
3859                ::terminal::terminal_settings::CursorShape::default(),
3860                ::terminal::terminal_settings::AlternateScroll::On,
3861                None,
3862                0,
3863            )
3864            .unwrap();
3865            builder.subscribe(cx)
3866        });
3867
3868        thread.update(cx, |thread, cx| {
3869            thread.on_terminal_provider_event(
3870                TerminalProviderEvent::Created {
3871                    terminal_id: terminal_id_2.clone(),
3872                    label: "echo 'second'".to_string(),
3873                    cwd: Some(PathBuf::from("/test")),
3874                    output_byte_limit: None,
3875                    terminal: mock_terminal_2.clone(),
3876                },
3877                cx,
3878            );
3879        });
3880
3881        thread.update(cx, |thread, cx| {
3882            thread.on_terminal_provider_event(
3883                TerminalProviderEvent::Output {
3884                    terminal_id: terminal_id_2.clone(),
3885                    data: b"second\n".to_vec(),
3886                },
3887                cx,
3888            );
3889        });
3890
3891        thread.update(cx, |thread, cx| {
3892            thread.on_terminal_provider_event(
3893                TerminalProviderEvent::Exit {
3894                    terminal_id: terminal_id_2.clone(),
3895                    status: acp::TerminalExitStatus::new().exit_code(0),
3896                },
3897                cx,
3898            );
3899        });
3900
3901        // Get the second message ID to restore to
3902        let second_message_id = thread.read_with(cx, |thread, _| {
3903            // At this point we have:
3904            // - Index 0: First user message (with checkpoint)
3905            // - Index 1: Second user message (with checkpoint)
3906            // No assistant responses because FakeAgentConnection just returns EndTurn
3907            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
3908                panic!("expected user message at index 1");
3909            };
3910            message.id.clone().unwrap()
3911        });
3912
3913        // Create a terminal AFTER the checkpoint we'll restore to.
3914        // This simulates the AI agent starting a long-running terminal command.
3915        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3916        let mock_terminal = cx.new(|cx| {
3917            let builder = ::terminal::TerminalBuilder::new_display_only(
3918                ::terminal::terminal_settings::CursorShape::default(),
3919                ::terminal::terminal_settings::AlternateScroll::On,
3920                None,
3921                0,
3922            )
3923            .unwrap();
3924            builder.subscribe(cx)
3925        });
3926
3927        // Register the terminal as created
3928        thread.update(cx, |thread, cx| {
3929            thread.on_terminal_provider_event(
3930                TerminalProviderEvent::Created {
3931                    terminal_id: terminal_id.clone(),
3932                    label: "sleep 1000".to_string(),
3933                    cwd: Some(PathBuf::from("/test")),
3934                    output_byte_limit: None,
3935                    terminal: mock_terminal.clone(),
3936                },
3937                cx,
3938            );
3939        });
3940
3941        // Simulate the terminal producing output (still running)
3942        thread.update(cx, |thread, cx| {
3943            thread.on_terminal_provider_event(
3944                TerminalProviderEvent::Output {
3945                    terminal_id: terminal_id.clone(),
3946                    data: b"terminal is running...\n".to_vec(),
3947                },
3948                cx,
3949            );
3950        });
3951
3952        // Create a tool call entry that references this terminal
3953        // This represents the agent requesting a terminal command
3954        thread.update(cx, |thread, cx| {
3955            thread
3956                .handle_session_update(
3957                    acp::SessionUpdate::ToolCall(
3958                        acp::ToolCall::new("terminal-tool-1", "Running command")
3959                            .kind(acp::ToolKind::Execute)
3960                            .status(acp::ToolCallStatus::InProgress)
3961                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
3962                                terminal_id.clone(),
3963                            ))])
3964                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
3965                    ),
3966                    cx,
3967                )
3968                .unwrap();
3969        });
3970
3971        // Verify terminal exists and is in the thread
3972        let terminal_exists_before =
3973            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
3974        assert!(
3975            terminal_exists_before,
3976            "Terminal should exist before checkpoint restore"
3977        );
3978
3979        // Verify the terminal's underlying task is still running (not completed)
3980        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
3981            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
3982            terminal_entity.read_with(cx, |term, _cx| {
3983                term.output().is_none() // output is None means it's still running
3984            })
3985        });
3986        assert!(
3987            terminal_running_before,
3988            "Terminal should be running before checkpoint restore"
3989        );
3990
3991        // Verify we have the expected entries before restore
3992        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
3993        assert!(
3994            entry_count_before > 1,
3995            "Should have multiple entries before restore"
3996        );
3997
3998        // Restore the checkpoint to the second message.
3999        // This should:
4000        // 1. Cancel any in-progress generation (via the cancel() call)
4001        // 2. Remove the terminal that was created after that point
4002        thread
4003            .update(cx, |thread, cx| {
4004                thread.restore_checkpoint(second_message_id, cx)
4005            })
4006            .await
4007            .unwrap();
4008
4009        // Verify that no send_task is in progress after restore
4010        // (cancel() clears the send_task)
4011        let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4012        assert!(
4013            !has_send_task_after,
4014            "Should not have a send_task after restore (cancel should have cleared it)"
4015        );
4016
4017        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4018        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4019        assert_eq!(
4020            entry_count, 1,
4021            "Should have 1 entry after restore (only the first user message)"
4022        );
4023
4024        // Verify the 2 completed terminals from before the checkpoint still exist
4025        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4026            thread.terminals.contains_key(&terminal_id_1)
4027        });
4028        assert!(
4029            terminal_1_exists,
4030            "Terminal 1 (from before checkpoint) should still exist"
4031        );
4032
4033        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4034            thread.terminals.contains_key(&terminal_id_2)
4035        });
4036        assert!(
4037            terminal_2_exists,
4038            "Terminal 2 (from before checkpoint) should still exist"
4039        );
4040
4041        // Verify they're still in completed state
4042        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4043            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4044            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4045        });
4046        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4047
4048        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4049            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4050            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4051        });
4052        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4053
4054        // Verify the running terminal (created after checkpoint) was removed
4055        let terminal_3_exists =
4056            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4057        assert!(
4058            !terminal_3_exists,
4059            "Terminal 3 (created after checkpoint) should have been removed"
4060        );
4061
4062        // Verify total count is 2 (the two from before the checkpoint)
4063        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4064        assert_eq!(
4065            terminal_count, 2,
4066            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4067        );
4068    }
4069}