acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6/// Key used in ACP ToolCall meta to store the tool's programmatic name.
   7/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
   8pub const TOOL_NAME_META_KEY: &str = "tool_name";
   9
  10/// Key used in ACP ToolCall meta to store the session id when a subagent is spawned.
  11pub const SUBAGENT_SESSION_ID_META_KEY: &str = "subagent_session_id";
  12
  13/// Helper to extract tool name from ACP meta
  14pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  15    meta.as_ref()
  16        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  17        .and_then(|v| v.as_str())
  18        .map(|s| SharedString::from(s.to_owned()))
  19}
  20
  21/// Helper to extract subagent session id from ACP meta
  22pub fn subagent_session_id_from_meta(meta: &Option<acp::Meta>) -> Option<acp::SessionId> {
  23    meta.as_ref()
  24        .and_then(|m| m.get(SUBAGENT_SESSION_ID_META_KEY))
  25        .and_then(|v| v.as_str())
  26        .map(|s| acp::SessionId::from(s.to_string()))
  27}
  28
  29/// Helper to create meta with tool name
  30pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  31    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  32}
  33use collections::HashSet;
  34pub use connection::*;
  35pub use diff::*;
  36use language::language_settings::FormatOnSave;
  37pub use mention::*;
  38use project::lsp_store::{FormatTrigger, LspFormatTarget};
  39use serde::{Deserialize, Serialize};
  40use serde_json::to_string_pretty;
  41
  42use task::{Shell, ShellBuilder};
  43pub use terminal::*;
  44
  45use action_log::{ActionLog, ActionLogTelemetry};
  46use agent_client_protocol::{self as acp};
  47use anyhow::{Context as _, Result, anyhow};
  48use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  49use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  50use itertools::Itertools;
  51use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  52use markdown::Markdown;
  53use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  54use std::collections::HashMap;
  55use std::error::Error;
  56use std::fmt::{Formatter, Write};
  57use std::ops::Range;
  58use std::process::ExitStatus;
  59use std::rc::Rc;
  60use std::time::{Duration, Instant};
  61use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  62use text::Bias;
  63use ui::App;
  64use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  65use uuid::Uuid;
  66
  67#[derive(Debug)]
  68pub struct UserMessage {
  69    pub id: Option<UserMessageId>,
  70    pub content: ContentBlock,
  71    pub chunks: Vec<acp::ContentBlock>,
  72    pub checkpoint: Option<Checkpoint>,
  73    pub indented: bool,
  74}
  75
  76#[derive(Debug)]
  77pub struct Checkpoint {
  78    git_checkpoint: GitStoreCheckpoint,
  79    pub show: bool,
  80}
  81
  82impl UserMessage {
  83    fn to_markdown(&self, cx: &App) -> String {
  84        let mut markdown = String::new();
  85        if self
  86            .checkpoint
  87            .as_ref()
  88            .is_some_and(|checkpoint| checkpoint.show)
  89        {
  90            writeln!(markdown, "## User (checkpoint)").unwrap();
  91        } else {
  92            writeln!(markdown, "## User").unwrap();
  93        }
  94        writeln!(markdown).unwrap();
  95        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  96        writeln!(markdown).unwrap();
  97        markdown
  98    }
  99}
 100
 101#[derive(Debug, PartialEq)]
 102pub struct AssistantMessage {
 103    pub chunks: Vec<AssistantMessageChunk>,
 104    pub indented: bool,
 105}
 106
 107impl AssistantMessage {
 108    pub fn to_markdown(&self, cx: &App) -> String {
 109        format!(
 110            "## Assistant\n\n{}\n\n",
 111            self.chunks
 112                .iter()
 113                .map(|chunk| chunk.to_markdown(cx))
 114                .join("\n\n")
 115        )
 116    }
 117}
 118
 119#[derive(Debug, PartialEq)]
 120pub enum AssistantMessageChunk {
 121    Message { block: ContentBlock },
 122    Thought { block: ContentBlock },
 123}
 124
 125impl AssistantMessageChunk {
 126    pub fn from_str(
 127        chunk: &str,
 128        language_registry: &Arc<LanguageRegistry>,
 129        path_style: PathStyle,
 130        cx: &mut App,
 131    ) -> Self {
 132        Self::Message {
 133            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 134        }
 135    }
 136
 137    fn to_markdown(&self, cx: &App) -> String {
 138        match self {
 139            Self::Message { block } => block.to_markdown(cx).to_string(),
 140            Self::Thought { block } => {
 141                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 142            }
 143        }
 144    }
 145}
 146
 147#[derive(Debug)]
 148pub enum AgentThreadEntry {
 149    UserMessage(UserMessage),
 150    AssistantMessage(AssistantMessage),
 151    ToolCall(ToolCall),
 152}
 153
 154impl AgentThreadEntry {
 155    pub fn is_indented(&self) -> bool {
 156        match self {
 157            Self::UserMessage(message) => message.indented,
 158            Self::AssistantMessage(message) => message.indented,
 159            Self::ToolCall(_) => false,
 160        }
 161    }
 162
 163    pub fn to_markdown(&self, cx: &App) -> String {
 164        match self {
 165            Self::UserMessage(message) => message.to_markdown(cx),
 166            Self::AssistantMessage(message) => message.to_markdown(cx),
 167            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 168        }
 169    }
 170
 171    pub fn user_message(&self) -> Option<&UserMessage> {
 172        if let AgentThreadEntry::UserMessage(message) = self {
 173            Some(message)
 174        } else {
 175            None
 176        }
 177    }
 178
 179    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 180        if let AgentThreadEntry::ToolCall(call) = self {
 181            itertools::Either::Left(call.diffs())
 182        } else {
 183            itertools::Either::Right(std::iter::empty())
 184        }
 185    }
 186
 187    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 188        if let AgentThreadEntry::ToolCall(call) = self {
 189            itertools::Either::Left(call.terminals())
 190        } else {
 191            itertools::Either::Right(std::iter::empty())
 192        }
 193    }
 194
 195    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 196        if let AgentThreadEntry::ToolCall(ToolCall {
 197            locations,
 198            resolved_locations,
 199            ..
 200        }) = self
 201        {
 202            Some((
 203                locations.get(ix)?.clone(),
 204                resolved_locations.get(ix)?.clone()?,
 205            ))
 206        } else {
 207            None
 208        }
 209    }
 210}
 211
 212#[derive(Debug)]
 213pub struct ToolCall {
 214    pub id: acp::ToolCallId,
 215    pub label: Entity<Markdown>,
 216    pub kind: acp::ToolKind,
 217    pub content: Vec<ToolCallContent>,
 218    pub status: ToolCallStatus,
 219    pub locations: Vec<acp::ToolCallLocation>,
 220    pub resolved_locations: Vec<Option<AgentLocation>>,
 221    pub raw_input: Option<serde_json::Value>,
 222    pub raw_input_markdown: Option<Entity<Markdown>>,
 223    pub raw_output: Option<serde_json::Value>,
 224    pub tool_name: Option<SharedString>,
 225    pub subagent_session_id: Option<acp::SessionId>,
 226}
 227
 228impl ToolCall {
 229    fn from_acp(
 230        tool_call: acp::ToolCall,
 231        status: ToolCallStatus,
 232        language_registry: Arc<LanguageRegistry>,
 233        path_style: PathStyle,
 234        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 235        cx: &mut App,
 236    ) -> Result<Self> {
 237        let title = if tool_call.kind == acp::ToolKind::Execute {
 238            tool_call.title
 239        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 240            first_line.to_owned() + ""
 241        } else {
 242            tool_call.title
 243        };
 244        let mut content = Vec::with_capacity(tool_call.content.len());
 245        for item in tool_call.content {
 246            if let Some(item) = ToolCallContent::from_acp(
 247                item,
 248                language_registry.clone(),
 249                path_style,
 250                terminals,
 251                cx,
 252            )? {
 253                content.push(item);
 254            }
 255        }
 256
 257        let raw_input_markdown = tool_call
 258            .raw_input
 259            .as_ref()
 260            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 261
 262        let tool_name = tool_name_from_meta(&tool_call.meta);
 263
 264        let subagent_session = subagent_session_id_from_meta(&tool_call.meta);
 265
 266        let result = Self {
 267            id: tool_call.tool_call_id,
 268            label: cx
 269                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 270            kind: tool_call.kind,
 271            content,
 272            locations: tool_call.locations,
 273            resolved_locations: Vec::default(),
 274            status,
 275            raw_input: tool_call.raw_input,
 276            raw_input_markdown,
 277            raw_output: tool_call.raw_output,
 278            tool_name,
 279            subagent_session_id: subagent_session,
 280        };
 281        Ok(result)
 282    }
 283
 284    fn update_fields(
 285        &mut self,
 286        fields: acp::ToolCallUpdateFields,
 287        meta: Option<acp::Meta>,
 288        language_registry: Arc<LanguageRegistry>,
 289        path_style: PathStyle,
 290        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 291        cx: &mut App,
 292    ) -> Result<()> {
 293        let acp::ToolCallUpdateFields {
 294            kind,
 295            status,
 296            title,
 297            content,
 298            locations,
 299            raw_input,
 300            raw_output,
 301            ..
 302        } = fields;
 303
 304        if let Some(kind) = kind {
 305            self.kind = kind;
 306        }
 307
 308        if let Some(status) = status {
 309            self.status = status.into();
 310        }
 311
 312        if let Some(subagent_session_id) = subagent_session_id_from_meta(&meta) {
 313            self.subagent_session_id = Some(subagent_session_id);
 314        }
 315
 316        if let Some(title) = title {
 317            self.label.update(cx, |label, cx| {
 318                if self.kind == acp::ToolKind::Execute {
 319                    label.replace(title, cx);
 320                } else if let Some((first_line, _)) = title.split_once("\n") {
 321                    label.replace(first_line.to_owned() + "", cx);
 322                } else {
 323                    label.replace(title, cx);
 324                }
 325            });
 326        }
 327
 328        if let Some(content) = content {
 329            let mut new_content_len = content.len();
 330            let mut content = content.into_iter();
 331
 332            // Reuse existing content if we can
 333            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 334                let valid_content =
 335                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 336                if !valid_content {
 337                    new_content_len -= 1;
 338                }
 339            }
 340            for new in content {
 341                if let Some(new) = ToolCallContent::from_acp(
 342                    new,
 343                    language_registry.clone(),
 344                    path_style,
 345                    terminals,
 346                    cx,
 347                )? {
 348                    self.content.push(new);
 349                } else {
 350                    new_content_len -= 1;
 351                }
 352            }
 353            self.content.truncate(new_content_len);
 354        }
 355
 356        if let Some(locations) = locations {
 357            self.locations = locations;
 358        }
 359
 360        if let Some(raw_input) = raw_input {
 361            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 362            self.raw_input = Some(raw_input);
 363        }
 364
 365        if let Some(raw_output) = raw_output {
 366            if self.content.is_empty()
 367                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 368            {
 369                self.content
 370                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 371                        markdown,
 372                    }));
 373            }
 374            self.raw_output = Some(raw_output);
 375        }
 376        Ok(())
 377    }
 378
 379    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 380        self.content.iter().filter_map(|content| match content {
 381            ToolCallContent::Diff(diff) => Some(diff),
 382            ToolCallContent::ContentBlock(_) => None,
 383            ToolCallContent::Terminal(_) => None,
 384        })
 385    }
 386
 387    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 388        self.content.iter().filter_map(|content| match content {
 389            ToolCallContent::Terminal(terminal) => Some(terminal),
 390            ToolCallContent::ContentBlock(_) => None,
 391            ToolCallContent::Diff(_) => None,
 392        })
 393    }
 394
 395    pub fn is_subagent(&self) -> bool {
 396        self.tool_name.as_ref().is_some_and(|s| s == "subagent")
 397            || self.subagent_session_id.is_some()
 398    }
 399
 400    pub fn to_markdown(&self, cx: &App) -> String {
 401        let mut markdown = format!(
 402            "**Tool Call: {}**\nStatus: {}\n\n",
 403            self.label.read(cx).source(),
 404            self.status
 405        );
 406        for content in &self.content {
 407            markdown.push_str(content.to_markdown(cx).as_str());
 408            markdown.push_str("\n\n");
 409        }
 410        markdown
 411    }
 412
 413    async fn resolve_location(
 414        location: acp::ToolCallLocation,
 415        project: WeakEntity<Project>,
 416        cx: &mut AsyncApp,
 417    ) -> Option<ResolvedLocation> {
 418        let buffer = project
 419            .update(cx, |project, cx| {
 420                project
 421                    .project_path_for_absolute_path(&location.path, cx)
 422                    .map(|path| project.open_buffer(path, cx))
 423            })
 424            .ok()??;
 425        let buffer = buffer.await.log_err()?;
 426        let position = buffer.update(cx, |buffer, _| {
 427            let snapshot = buffer.snapshot();
 428            if let Some(row) = location.line {
 429                let column = snapshot.indent_size_for_line(row).len;
 430                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 431                snapshot.anchor_before(point)
 432            } else {
 433                Anchor::min_for_buffer(snapshot.remote_id())
 434            }
 435        });
 436
 437        Some(ResolvedLocation { buffer, position })
 438    }
 439
 440    fn resolve_locations(
 441        &self,
 442        project: Entity<Project>,
 443        cx: &mut App,
 444    ) -> Task<Vec<Option<ResolvedLocation>>> {
 445        let locations = self.locations.clone();
 446        project.update(cx, |_, cx| {
 447            cx.spawn(async move |project, cx| {
 448                let mut new_locations = Vec::new();
 449                for location in locations {
 450                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 451                }
 452                new_locations
 453            })
 454        })
 455    }
 456}
 457
 458// Separate so we can hold a strong reference to the buffer
 459// for saving on the thread
 460#[derive(Clone, Debug, PartialEq, Eq)]
 461struct ResolvedLocation {
 462    buffer: Entity<Buffer>,
 463    position: Anchor,
 464}
 465
 466impl From<&ResolvedLocation> for AgentLocation {
 467    fn from(value: &ResolvedLocation) -> Self {
 468        Self {
 469            buffer: value.buffer.downgrade(),
 470            position: value.position,
 471        }
 472    }
 473}
 474
 475#[derive(Debug)]
 476pub enum ToolCallStatus {
 477    /// The tool call hasn't started running yet, but we start showing it to
 478    /// the user.
 479    Pending,
 480    /// The tool call is waiting for confirmation from the user.
 481    WaitingForConfirmation {
 482        options: PermissionOptions,
 483        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 484    },
 485    /// The tool call is currently running.
 486    InProgress,
 487    /// The tool call completed successfully.
 488    Completed,
 489    /// The tool call failed.
 490    Failed,
 491    /// The user rejected the tool call.
 492    Rejected,
 493    /// The user canceled generation so the tool call was canceled.
 494    Canceled,
 495}
 496
 497impl From<acp::ToolCallStatus> for ToolCallStatus {
 498    fn from(status: acp::ToolCallStatus) -> Self {
 499        match status {
 500            acp::ToolCallStatus::Pending => Self::Pending,
 501            acp::ToolCallStatus::InProgress => Self::InProgress,
 502            acp::ToolCallStatus::Completed => Self::Completed,
 503            acp::ToolCallStatus::Failed => Self::Failed,
 504            _ => Self::Pending,
 505        }
 506    }
 507}
 508
 509impl Display for ToolCallStatus {
 510    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 511        write!(
 512            f,
 513            "{}",
 514            match self {
 515                ToolCallStatus::Pending => "Pending",
 516                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 517                ToolCallStatus::InProgress => "In Progress",
 518                ToolCallStatus::Completed => "Completed",
 519                ToolCallStatus::Failed => "Failed",
 520                ToolCallStatus::Rejected => "Rejected",
 521                ToolCallStatus::Canceled => "Canceled",
 522            }
 523        )
 524    }
 525}
 526
 527#[derive(Debug, PartialEq, Clone)]
 528pub enum ContentBlock {
 529    Empty,
 530    Markdown { markdown: Entity<Markdown> },
 531    ResourceLink { resource_link: acp::ResourceLink },
 532    Image { image: Arc<gpui::Image> },
 533}
 534
 535impl ContentBlock {
 536    pub fn new(
 537        block: acp::ContentBlock,
 538        language_registry: &Arc<LanguageRegistry>,
 539        path_style: PathStyle,
 540        cx: &mut App,
 541    ) -> Self {
 542        let mut this = Self::Empty;
 543        this.append(block, language_registry, path_style, cx);
 544        this
 545    }
 546
 547    pub fn new_combined(
 548        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 549        language_registry: Arc<LanguageRegistry>,
 550        path_style: PathStyle,
 551        cx: &mut App,
 552    ) -> Self {
 553        let mut this = Self::Empty;
 554        for block in blocks {
 555            this.append(block, &language_registry, path_style, cx);
 556        }
 557        this
 558    }
 559
 560    pub fn append(
 561        &mut self,
 562        block: acp::ContentBlock,
 563        language_registry: &Arc<LanguageRegistry>,
 564        path_style: PathStyle,
 565        cx: &mut App,
 566    ) {
 567        match (&mut *self, &block) {
 568            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 569                *self = ContentBlock::ResourceLink {
 570                    resource_link: resource_link.clone(),
 571                };
 572            }
 573            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 574                if let Some(image) = Self::decode_image(image_content) {
 575                    *self = ContentBlock::Image { image };
 576                } else {
 577                    let new_content = Self::image_md(image_content);
 578                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 579                }
 580            }
 581            (ContentBlock::Empty, _) => {
 582                let new_content = Self::block_string_contents(&block, path_style);
 583                *self = Self::create_markdown_block(new_content, language_registry, cx);
 584            }
 585            (ContentBlock::Markdown { markdown }, _) => {
 586                let new_content = Self::block_string_contents(&block, path_style);
 587                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 588            }
 589            (ContentBlock::ResourceLink { resource_link }, _) => {
 590                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 591                let new_content = Self::block_string_contents(&block, path_style);
 592                let combined = format!("{}\n{}", existing_content, new_content);
 593                *self = Self::create_markdown_block(combined, language_registry, cx);
 594            }
 595            (ContentBlock::Image { .. }, _) => {
 596                let new_content = Self::block_string_contents(&block, path_style);
 597                let combined = format!("`Image`\n{}", new_content);
 598                *self = Self::create_markdown_block(combined, language_registry, cx);
 599            }
 600        }
 601    }
 602
 603    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 604        use base64::Engine as _;
 605
 606        let bytes = base64::engine::general_purpose::STANDARD
 607            .decode(image_content.data.as_bytes())
 608            .ok()?;
 609        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 610        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 611    }
 612
 613    fn create_markdown_block(
 614        content: String,
 615        language_registry: &Arc<LanguageRegistry>,
 616        cx: &mut App,
 617    ) -> ContentBlock {
 618        ContentBlock::Markdown {
 619            markdown: cx
 620                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 621        }
 622    }
 623
 624    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 625        match block {
 626            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 627            acp::ContentBlock::ResourceLink(resource_link) => {
 628                Self::resource_link_md(&resource_link.uri, path_style)
 629            }
 630            acp::ContentBlock::Resource(acp::EmbeddedResource {
 631                resource:
 632                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 633                        uri,
 634                        ..
 635                    }),
 636                ..
 637            }) => Self::resource_link_md(uri, path_style),
 638            acp::ContentBlock::Image(image) => Self::image_md(image),
 639            _ => String::new(),
 640        }
 641    }
 642
 643    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 644        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 645            uri.as_link().to_string()
 646        } else {
 647            uri.to_string()
 648        }
 649    }
 650
 651    fn image_md(_image: &acp::ImageContent) -> String {
 652        "`Image`".into()
 653    }
 654
 655    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 656        match self {
 657            ContentBlock::Empty => "",
 658            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 659            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 660            ContentBlock::Image { .. } => "`Image`",
 661        }
 662    }
 663
 664    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 665        match self {
 666            ContentBlock::Empty => None,
 667            ContentBlock::Markdown { markdown } => Some(markdown),
 668            ContentBlock::ResourceLink { .. } => None,
 669            ContentBlock::Image { .. } => None,
 670        }
 671    }
 672
 673    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 674        match self {
 675            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 676            _ => None,
 677        }
 678    }
 679
 680    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 681        match self {
 682            ContentBlock::Image { image } => Some(image),
 683            _ => None,
 684        }
 685    }
 686}
 687
 688#[derive(Debug)]
 689pub enum ToolCallContent {
 690    ContentBlock(ContentBlock),
 691    Diff(Entity<Diff>),
 692    Terminal(Entity<Terminal>),
 693}
 694
 695impl ToolCallContent {
 696    pub fn from_acp(
 697        content: acp::ToolCallContent,
 698        language_registry: Arc<LanguageRegistry>,
 699        path_style: PathStyle,
 700        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 701        cx: &mut App,
 702    ) -> Result<Option<Self>> {
 703        match content {
 704            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 705                Ok(Some(Self::ContentBlock(ContentBlock::new(
 706                    content,
 707                    &language_registry,
 708                    path_style,
 709                    cx,
 710                ))))
 711            }
 712            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 713                Diff::finalized(
 714                    diff.path.to_string_lossy().into_owned(),
 715                    diff.old_text,
 716                    diff.new_text,
 717                    language_registry,
 718                    cx,
 719                )
 720            })))),
 721            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 722                .get(&terminal_id)
 723                .cloned()
 724                .map(|terminal| Some(Self::Terminal(terminal)))
 725                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 726            _ => Ok(None),
 727        }
 728    }
 729
 730    pub fn update_from_acp(
 731        &mut self,
 732        new: acp::ToolCallContent,
 733        language_registry: Arc<LanguageRegistry>,
 734        path_style: PathStyle,
 735        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 736        cx: &mut App,
 737    ) -> Result<bool> {
 738        let needs_update = match (&self, &new) {
 739            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 740                old_diff.read(cx).needs_update(
 741                    new_diff.old_text.as_deref().unwrap_or(""),
 742                    &new_diff.new_text,
 743                    cx,
 744                )
 745            }
 746            _ => true,
 747        };
 748
 749        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 750            if needs_update {
 751                *self = update;
 752            }
 753            Ok(true)
 754        } else {
 755            Ok(false)
 756        }
 757    }
 758
 759    pub fn to_markdown(&self, cx: &App) -> String {
 760        match self {
 761            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 762            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 763            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 764        }
 765    }
 766
 767    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 768        match self {
 769            Self::ContentBlock(content) => content.image(),
 770            _ => None,
 771        }
 772    }
 773}
 774
 775#[derive(Debug, PartialEq)]
 776pub enum ToolCallUpdate {
 777    UpdateFields(acp::ToolCallUpdate),
 778    UpdateDiff(ToolCallUpdateDiff),
 779    UpdateTerminal(ToolCallUpdateTerminal),
 780}
 781
 782impl ToolCallUpdate {
 783    fn id(&self) -> &acp::ToolCallId {
 784        match self {
 785            Self::UpdateFields(update) => &update.tool_call_id,
 786            Self::UpdateDiff(diff) => &diff.id,
 787            Self::UpdateTerminal(terminal) => &terminal.id,
 788        }
 789    }
 790}
 791
 792impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 793    fn from(update: acp::ToolCallUpdate) -> Self {
 794        Self::UpdateFields(update)
 795    }
 796}
 797
 798impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 799    fn from(diff: ToolCallUpdateDiff) -> Self {
 800        Self::UpdateDiff(diff)
 801    }
 802}
 803
 804#[derive(Debug, PartialEq)]
 805pub struct ToolCallUpdateDiff {
 806    pub id: acp::ToolCallId,
 807    pub diff: Entity<Diff>,
 808}
 809
 810impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 811    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 812        Self::UpdateTerminal(terminal)
 813    }
 814}
 815
 816#[derive(Debug, PartialEq)]
 817pub struct ToolCallUpdateTerminal {
 818    pub id: acp::ToolCallId,
 819    pub terminal: Entity<Terminal>,
 820}
 821
 822#[derive(Debug, Default)]
 823pub struct Plan {
 824    pub entries: Vec<PlanEntry>,
 825}
 826
 827#[derive(Debug)]
 828pub struct PlanStats<'a> {
 829    pub in_progress_entry: Option<&'a PlanEntry>,
 830    pub pending: u32,
 831    pub completed: u32,
 832}
 833
 834impl Plan {
 835    pub fn is_empty(&self) -> bool {
 836        self.entries.is_empty()
 837    }
 838
 839    pub fn stats(&self) -> PlanStats<'_> {
 840        let mut stats = PlanStats {
 841            in_progress_entry: None,
 842            pending: 0,
 843            completed: 0,
 844        };
 845
 846        for entry in &self.entries {
 847            match &entry.status {
 848                acp::PlanEntryStatus::Pending => {
 849                    stats.pending += 1;
 850                }
 851                acp::PlanEntryStatus::InProgress => {
 852                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 853                }
 854                acp::PlanEntryStatus::Completed => {
 855                    stats.completed += 1;
 856                }
 857                _ => {}
 858            }
 859        }
 860
 861        stats
 862    }
 863}
 864
 865#[derive(Debug)]
 866pub struct PlanEntry {
 867    pub content: Entity<Markdown>,
 868    pub priority: acp::PlanEntryPriority,
 869    pub status: acp::PlanEntryStatus,
 870}
 871
 872impl PlanEntry {
 873    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 874        Self {
 875            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 876            priority: entry.priority,
 877            status: entry.status,
 878        }
 879    }
 880}
 881
 882#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 883pub struct TokenUsage {
 884    pub max_tokens: u64,
 885    pub used_tokens: u64,
 886    pub input_tokens: u64,
 887    pub output_tokens: u64,
 888}
 889
 890impl TokenUsage {
 891    pub fn ratio(&self) -> TokenUsageRatio {
 892        #[cfg(debug_assertions)]
 893        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 894            .unwrap_or("0.8".to_string())
 895            .parse()
 896            .unwrap();
 897        #[cfg(not(debug_assertions))]
 898        let warning_threshold: f32 = 0.8;
 899
 900        // When the maximum is unknown because there is no selected model,
 901        // avoid showing the token limit warning.
 902        if self.max_tokens == 0 {
 903            TokenUsageRatio::Normal
 904        } else if self.used_tokens >= self.max_tokens {
 905            TokenUsageRatio::Exceeded
 906        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 907            TokenUsageRatio::Warning
 908        } else {
 909            TokenUsageRatio::Normal
 910        }
 911    }
 912}
 913
 914#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 915pub enum TokenUsageRatio {
 916    Normal,
 917    Warning,
 918    Exceeded,
 919}
 920
 921#[derive(Debug, Clone)]
 922pub struct RetryStatus {
 923    pub last_error: SharedString,
 924    pub attempt: usize,
 925    pub max_attempts: usize,
 926    pub started_at: Instant,
 927    pub duration: Duration,
 928}
 929
 930pub struct AcpThread {
 931    parent_session_id: Option<acp::SessionId>,
 932    title: SharedString,
 933    entries: Vec<AgentThreadEntry>,
 934    plan: Plan,
 935    project: Entity<Project>,
 936    action_log: Entity<ActionLog>,
 937    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 938    send_task: Option<Task<()>>,
 939    connection: Rc<dyn AgentConnection>,
 940    session_id: acp::SessionId,
 941    token_usage: Option<TokenUsage>,
 942    prompt_capabilities: acp::PromptCapabilities,
 943    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 944    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 945    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 946    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 947    // subagent cancellation fields
 948    user_stopped: Arc<std::sync::atomic::AtomicBool>,
 949    user_stop_tx: watch::Sender<bool>,
 950}
 951
 952impl From<&AcpThread> for ActionLogTelemetry {
 953    fn from(value: &AcpThread) -> Self {
 954        Self {
 955            agent_telemetry_id: value.connection().telemetry_id(),
 956            session_id: value.session_id.0.clone(),
 957        }
 958    }
 959}
 960
 961#[derive(Debug)]
 962pub enum AcpThreadEvent {
 963    NewEntry,
 964    TitleUpdated,
 965    TokenUsageUpdated,
 966    EntryUpdated(usize),
 967    EntriesRemoved(Range<usize>),
 968    ToolAuthorizationRequired,
 969    Retry(RetryStatus),
 970    SubagentSpawned(acp::SessionId),
 971    Stopped,
 972    Error,
 973    LoadError(LoadError),
 974    PromptCapabilitiesUpdated,
 975    Refusal,
 976    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
 977    ModeUpdated(acp::SessionModeId),
 978    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
 979}
 980
 981impl EventEmitter<AcpThreadEvent> for AcpThread {}
 982
 983#[derive(Debug, Clone)]
 984pub enum TerminalProviderEvent {
 985    Created {
 986        terminal_id: acp::TerminalId,
 987        label: String,
 988        cwd: Option<PathBuf>,
 989        output_byte_limit: Option<u64>,
 990        terminal: Entity<::terminal::Terminal>,
 991    },
 992    Output {
 993        terminal_id: acp::TerminalId,
 994        data: Vec<u8>,
 995    },
 996    TitleChanged {
 997        terminal_id: acp::TerminalId,
 998        title: String,
 999    },
1000    Exit {
1001        terminal_id: acp::TerminalId,
1002        status: acp::TerminalExitStatus,
1003    },
1004}
1005
1006#[derive(Debug, Clone)]
1007pub enum TerminalProviderCommand {
1008    WriteInput {
1009        terminal_id: acp::TerminalId,
1010        bytes: Vec<u8>,
1011    },
1012    Resize {
1013        terminal_id: acp::TerminalId,
1014        cols: u16,
1015        rows: u16,
1016    },
1017    Close {
1018        terminal_id: acp::TerminalId,
1019    },
1020}
1021
1022impl AcpThread {
1023    pub fn on_terminal_provider_event(
1024        &mut self,
1025        event: TerminalProviderEvent,
1026        cx: &mut Context<Self>,
1027    ) {
1028        match event {
1029            TerminalProviderEvent::Created {
1030                terminal_id,
1031                label,
1032                cwd,
1033                output_byte_limit,
1034                terminal,
1035            } => {
1036                let entity = self.register_terminal_created(
1037                    terminal_id.clone(),
1038                    label,
1039                    cwd,
1040                    output_byte_limit,
1041                    terminal,
1042                    cx,
1043                );
1044
1045                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1046                    for data in chunks.drain(..) {
1047                        entity.update(cx, |term, cx| {
1048                            term.inner().update(cx, |inner, cx| {
1049                                inner.write_output(&data, cx);
1050                            })
1051                        });
1052                    }
1053                }
1054
1055                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1056                    entity.update(cx, |_term, cx| {
1057                        cx.notify();
1058                    });
1059                }
1060
1061                cx.notify();
1062            }
1063            TerminalProviderEvent::Output { terminal_id, data } => {
1064                if let Some(entity) = self.terminals.get(&terminal_id) {
1065                    entity.update(cx, |term, cx| {
1066                        term.inner().update(cx, |inner, cx| {
1067                            inner.write_output(&data, cx);
1068                        })
1069                    });
1070                } else {
1071                    self.pending_terminal_output
1072                        .entry(terminal_id)
1073                        .or_default()
1074                        .push(data);
1075                }
1076            }
1077            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1078                if let Some(entity) = self.terminals.get(&terminal_id) {
1079                    entity.update(cx, |term, cx| {
1080                        term.inner().update(cx, |inner, cx| {
1081                            inner.breadcrumb_text = title;
1082                            cx.emit(::terminal::Event::BreadcrumbsChanged);
1083                        })
1084                    });
1085                }
1086            }
1087            TerminalProviderEvent::Exit {
1088                terminal_id,
1089                status,
1090            } => {
1091                if let Some(entity) = self.terminals.get(&terminal_id) {
1092                    entity.update(cx, |_term, cx| {
1093                        cx.notify();
1094                    });
1095                } else {
1096                    self.pending_terminal_exit.insert(terminal_id, status);
1097                }
1098            }
1099        }
1100    }
1101}
1102
1103#[derive(PartialEq, Eq, Debug)]
1104pub enum ThreadStatus {
1105    Idle,
1106    Generating,
1107}
1108
1109#[derive(Debug, Clone)]
1110pub enum LoadError {
1111    Unsupported {
1112        command: SharedString,
1113        current_version: SharedString,
1114        minimum_version: SharedString,
1115    },
1116    FailedToInstall(SharedString),
1117    Exited {
1118        status: ExitStatus,
1119    },
1120    Other(SharedString),
1121}
1122
1123impl Display for LoadError {
1124    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1125        match self {
1126            LoadError::Unsupported {
1127                command: path,
1128                current_version,
1129                minimum_version,
1130            } => {
1131                write!(
1132                    f,
1133                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1134                )
1135            }
1136            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1137            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1138            LoadError::Other(msg) => write!(f, "{msg}"),
1139        }
1140    }
1141}
1142
1143impl Error for LoadError {}
1144
1145impl AcpThread {
1146    pub fn new(
1147        parent_session_id: Option<acp::SessionId>,
1148        title: impl Into<SharedString>,
1149        connection: Rc<dyn AgentConnection>,
1150        project: Entity<Project>,
1151        action_log: Entity<ActionLog>,
1152        session_id: acp::SessionId,
1153        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1154        cx: &mut Context<Self>,
1155    ) -> Self {
1156        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1157        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1158            loop {
1159                let caps = prompt_capabilities_rx.recv().await?;
1160                this.update(cx, |this, cx| {
1161                    this.prompt_capabilities = caps;
1162                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1163                })?;
1164            }
1165        });
1166
1167        let (user_stop_tx, _user_stop_rx) = watch::channel(false);
1168
1169        Self {
1170            parent_session_id,
1171            action_log,
1172            shared_buffers: Default::default(),
1173            entries: Default::default(),
1174            plan: Default::default(),
1175            title: title.into(),
1176            project,
1177            send_task: None,
1178            connection,
1179            session_id,
1180            token_usage: None,
1181            prompt_capabilities,
1182            _observe_prompt_capabilities: task,
1183            terminals: HashMap::default(),
1184            pending_terminal_output: HashMap::default(),
1185            pending_terminal_exit: HashMap::default(),
1186            user_stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1187            user_stop_tx,
1188        }
1189    }
1190
1191    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1192        self.parent_session_id.as_ref()
1193    }
1194
1195    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1196        self.prompt_capabilities.clone()
1197    }
1198
1199    /// Marks this thread as stopped by user action and signals any listeners.
1200    pub fn stop_by_user(&mut self) {
1201        self.user_stopped
1202            .store(true, std::sync::atomic::Ordering::SeqCst);
1203        self.user_stop_tx.send(true).ok();
1204        self.send_task.take();
1205    }
1206
1207    pub fn was_stopped_by_user(&self) -> bool {
1208        self.user_stopped.load(std::sync::atomic::Ordering::SeqCst)
1209    }
1210
1211    pub fn user_stop_receiver(&self) -> watch::Receiver<bool> {
1212        self.user_stop_tx.receiver()
1213    }
1214
1215    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1216        &self.connection
1217    }
1218
1219    pub fn action_log(&self) -> &Entity<ActionLog> {
1220        &self.action_log
1221    }
1222
1223    pub fn project(&self) -> &Entity<Project> {
1224        &self.project
1225    }
1226
1227    pub fn title(&self) -> SharedString {
1228        self.title.clone()
1229    }
1230
1231    pub fn entries(&self) -> &[AgentThreadEntry] {
1232        &self.entries
1233    }
1234
1235    pub fn session_id(&self) -> &acp::SessionId {
1236        &self.session_id
1237    }
1238
1239    pub fn status(&self) -> ThreadStatus {
1240        if self.send_task.is_some() {
1241            ThreadStatus::Generating
1242        } else {
1243            ThreadStatus::Idle
1244        }
1245    }
1246
1247    pub fn token_usage(&self) -> Option<&TokenUsage> {
1248        self.token_usage.as_ref()
1249    }
1250
1251    pub fn has_pending_edit_tool_calls(&self) -> bool {
1252        for entry in self.entries.iter().rev() {
1253            match entry {
1254                AgentThreadEntry::UserMessage(_) => return false,
1255                AgentThreadEntry::ToolCall(
1256                    call @ ToolCall {
1257                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1258                        ..
1259                    },
1260                ) if call.diffs().next().is_some() => {
1261                    return true;
1262                }
1263                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1264            }
1265        }
1266
1267        false
1268    }
1269
1270    pub fn has_in_progress_tool_calls(&self) -> bool {
1271        for entry in self.entries.iter().rev() {
1272            match entry {
1273                AgentThreadEntry::UserMessage(_) => return false,
1274                AgentThreadEntry::ToolCall(ToolCall {
1275                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1276                    ..
1277                }) => {
1278                    return true;
1279                }
1280                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1281            }
1282        }
1283
1284        false
1285    }
1286
1287    pub fn used_tools_since_last_user_message(&self) -> bool {
1288        for entry in self.entries.iter().rev() {
1289            match entry {
1290                AgentThreadEntry::UserMessage(..) => return false,
1291                AgentThreadEntry::AssistantMessage(..) => continue,
1292                AgentThreadEntry::ToolCall(..) => return true,
1293            }
1294        }
1295
1296        false
1297    }
1298
1299    pub fn handle_session_update(
1300        &mut self,
1301        update: acp::SessionUpdate,
1302        cx: &mut Context<Self>,
1303    ) -> Result<(), acp::Error> {
1304        match update {
1305            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1306                self.push_user_content_block(None, content, cx);
1307            }
1308            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1309                self.push_assistant_content_block(content, false, cx);
1310            }
1311            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1312                self.push_assistant_content_block(content, true, cx);
1313            }
1314            acp::SessionUpdate::ToolCall(tool_call) => {
1315                self.upsert_tool_call(tool_call, cx)?;
1316            }
1317            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1318                self.update_tool_call(tool_call_update, cx)?;
1319            }
1320            acp::SessionUpdate::Plan(plan) => {
1321                self.update_plan(plan, cx);
1322            }
1323            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1324                available_commands,
1325                ..
1326            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1327            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1328                current_mode_id,
1329                ..
1330            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1331            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1332                config_options,
1333                ..
1334            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1335            _ => {}
1336        }
1337        Ok(())
1338    }
1339
1340    pub fn push_user_content_block(
1341        &mut self,
1342        message_id: Option<UserMessageId>,
1343        chunk: acp::ContentBlock,
1344        cx: &mut Context<Self>,
1345    ) {
1346        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1347    }
1348
1349    pub fn push_user_content_block_with_indent(
1350        &mut self,
1351        message_id: Option<UserMessageId>,
1352        chunk: acp::ContentBlock,
1353        indented: bool,
1354        cx: &mut Context<Self>,
1355    ) {
1356        let language_registry = self.project.read(cx).languages().clone();
1357        let path_style = self.project.read(cx).path_style(cx);
1358        let entries_len = self.entries.len();
1359
1360        if let Some(last_entry) = self.entries.last_mut()
1361            && let AgentThreadEntry::UserMessage(UserMessage {
1362                id,
1363                content,
1364                chunks,
1365                indented: existing_indented,
1366                ..
1367            }) = last_entry
1368            && *existing_indented == indented
1369        {
1370            *id = message_id.or(id.take());
1371            content.append(chunk.clone(), &language_registry, path_style, cx);
1372            chunks.push(chunk);
1373            let idx = entries_len - 1;
1374            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1375        } else {
1376            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1377            self.push_entry(
1378                AgentThreadEntry::UserMessage(UserMessage {
1379                    id: message_id,
1380                    content,
1381                    chunks: vec![chunk],
1382                    checkpoint: None,
1383                    indented,
1384                }),
1385                cx,
1386            );
1387        }
1388    }
1389
1390    pub fn push_assistant_content_block(
1391        &mut self,
1392        chunk: acp::ContentBlock,
1393        is_thought: bool,
1394        cx: &mut Context<Self>,
1395    ) {
1396        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1397    }
1398
1399    pub fn push_assistant_content_block_with_indent(
1400        &mut self,
1401        chunk: acp::ContentBlock,
1402        is_thought: bool,
1403        indented: bool,
1404        cx: &mut Context<Self>,
1405    ) {
1406        let language_registry = self.project.read(cx).languages().clone();
1407        let path_style = self.project.read(cx).path_style(cx);
1408        let entries_len = self.entries.len();
1409        if let Some(last_entry) = self.entries.last_mut()
1410            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1411                chunks,
1412                indented: existing_indented,
1413            }) = last_entry
1414            && *existing_indented == indented
1415        {
1416            let idx = entries_len - 1;
1417            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1418            match (chunks.last_mut(), is_thought) {
1419                (Some(AssistantMessageChunk::Message { block }), false)
1420                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1421                    block.append(chunk, &language_registry, path_style, cx)
1422                }
1423                _ => {
1424                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1425                    if is_thought {
1426                        chunks.push(AssistantMessageChunk::Thought { block })
1427                    } else {
1428                        chunks.push(AssistantMessageChunk::Message { block })
1429                    }
1430                }
1431            }
1432        } else {
1433            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1434            let chunk = if is_thought {
1435                AssistantMessageChunk::Thought { block }
1436            } else {
1437                AssistantMessageChunk::Message { block }
1438            };
1439
1440            self.push_entry(
1441                AgentThreadEntry::AssistantMessage(AssistantMessage {
1442                    chunks: vec![chunk],
1443                    indented,
1444                }),
1445                cx,
1446            );
1447        }
1448    }
1449
1450    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1451        self.entries.push(entry);
1452        cx.emit(AcpThreadEvent::NewEntry);
1453    }
1454
1455    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1456        self.connection.set_title(&self.session_id, cx).is_some()
1457    }
1458
1459    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1460        if title != self.title {
1461            self.title = title.clone();
1462            cx.emit(AcpThreadEvent::TitleUpdated);
1463            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1464                return set_title.run(title, cx);
1465            }
1466        }
1467        Task::ready(Ok(()))
1468    }
1469
1470    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1471        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1472    }
1473
1474    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1475        self.token_usage = usage;
1476        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1477    }
1478
1479    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1480        cx.emit(AcpThreadEvent::Retry(status));
1481    }
1482
1483    pub fn update_tool_call(
1484        &mut self,
1485        update: impl Into<ToolCallUpdate>,
1486        cx: &mut Context<Self>,
1487    ) -> Result<()> {
1488        let update = update.into();
1489        let languages = self.project.read(cx).languages().clone();
1490        let path_style = self.project.read(cx).path_style(cx);
1491
1492        let ix = match self.index_for_tool_call(update.id()) {
1493            Some(ix) => ix,
1494            None => {
1495                // Tool call not found - create a failed tool call entry
1496                let failed_tool_call = ToolCall {
1497                    id: update.id().clone(),
1498                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1499                    kind: acp::ToolKind::Fetch,
1500                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1501                        "Tool call not found".into(),
1502                        &languages,
1503                        path_style,
1504                        cx,
1505                    ))],
1506                    status: ToolCallStatus::Failed,
1507                    locations: Vec::new(),
1508                    resolved_locations: Vec::new(),
1509                    raw_input: None,
1510                    raw_input_markdown: None,
1511                    raw_output: None,
1512                    tool_name: None,
1513                    subagent_session_id: None,
1514                };
1515                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1516                return Ok(());
1517            }
1518        };
1519        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1520            unreachable!()
1521        };
1522
1523        match update {
1524            ToolCallUpdate::UpdateFields(update) => {
1525                let location_updated = update.fields.locations.is_some();
1526                call.update_fields(
1527                    update.fields,
1528                    update.meta,
1529                    languages,
1530                    path_style,
1531                    &self.terminals,
1532                    cx,
1533                )?;
1534                if location_updated {
1535                    self.resolve_locations(update.tool_call_id, cx);
1536                }
1537            }
1538            ToolCallUpdate::UpdateDiff(update) => {
1539                call.content.clear();
1540                call.content.push(ToolCallContent::Diff(update.diff));
1541            }
1542            ToolCallUpdate::UpdateTerminal(update) => {
1543                call.content.clear();
1544                call.content
1545                    .push(ToolCallContent::Terminal(update.terminal));
1546            }
1547        }
1548
1549        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1550
1551        Ok(())
1552    }
1553
1554    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1555    pub fn upsert_tool_call(
1556        &mut self,
1557        tool_call: acp::ToolCall,
1558        cx: &mut Context<Self>,
1559    ) -> Result<(), acp::Error> {
1560        let status = tool_call.status.into();
1561        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1562    }
1563
1564    /// Fails if id does not match an existing entry.
1565    pub fn upsert_tool_call_inner(
1566        &mut self,
1567        update: acp::ToolCallUpdate,
1568        status: ToolCallStatus,
1569        cx: &mut Context<Self>,
1570    ) -> Result<(), acp::Error> {
1571        let language_registry = self.project.read(cx).languages().clone();
1572        let path_style = self.project.read(cx).path_style(cx);
1573        let id = update.tool_call_id.clone();
1574
1575        let agent_telemetry_id = self.connection().telemetry_id();
1576        let session = self.session_id();
1577        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1578            let status = if matches!(status, ToolCallStatus::Completed) {
1579                "completed"
1580            } else {
1581                "failed"
1582            };
1583            telemetry::event!(
1584                "Agent Tool Call Completed",
1585                agent_telemetry_id,
1586                session,
1587                status
1588            );
1589        }
1590
1591        if let Some(ix) = self.index_for_tool_call(&id) {
1592            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1593                unreachable!()
1594            };
1595
1596            call.update_fields(
1597                update.fields,
1598                update.meta,
1599                language_registry,
1600                path_style,
1601                &self.terminals,
1602                cx,
1603            )?;
1604            call.status = status;
1605
1606            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1607        } else {
1608            let call = ToolCall::from_acp(
1609                update.try_into()?,
1610                status,
1611                language_registry,
1612                self.project.read(cx).path_style(cx),
1613                &self.terminals,
1614                cx,
1615            )?;
1616            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1617        };
1618
1619        self.resolve_locations(id, cx);
1620        Ok(())
1621    }
1622
1623    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1624        self.entries
1625            .iter()
1626            .enumerate()
1627            .rev()
1628            .find_map(|(index, entry)| {
1629                if let AgentThreadEntry::ToolCall(tool_call) = entry
1630                    && &tool_call.id == id
1631                {
1632                    Some(index)
1633                } else {
1634                    None
1635                }
1636            })
1637    }
1638
1639    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1640        // The tool call we are looking for is typically the last one, or very close to the end.
1641        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1642        self.entries
1643            .iter_mut()
1644            .enumerate()
1645            .rev()
1646            .find_map(|(index, tool_call)| {
1647                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1648                    && &tool_call.id == id
1649                {
1650                    Some((index, tool_call))
1651                } else {
1652                    None
1653                }
1654            })
1655    }
1656
1657    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1658        self.entries
1659            .iter()
1660            .enumerate()
1661            .rev()
1662            .find_map(|(index, tool_call)| {
1663                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1664                    && &tool_call.id == id
1665                {
1666                    Some((index, tool_call))
1667                } else {
1668                    None
1669                }
1670            })
1671    }
1672
1673    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1674        let project = self.project.clone();
1675        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1676            return;
1677        };
1678        let task = tool_call.resolve_locations(project, cx);
1679        cx.spawn(async move |this, cx| {
1680            let resolved_locations = task.await;
1681
1682            this.update(cx, |this, cx| {
1683                let project = this.project.clone();
1684
1685                for location in resolved_locations.iter().flatten() {
1686                    this.shared_buffers
1687                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1688                }
1689                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1690                    return;
1691                };
1692
1693                if let Some(Some(location)) = resolved_locations.last() {
1694                    project.update(cx, |project, cx| {
1695                        let should_ignore = if let Some(agent_location) = project
1696                            .agent_location()
1697                            .filter(|agent_location| agent_location.buffer == location.buffer)
1698                        {
1699                            let snapshot = location.buffer.read(cx).snapshot();
1700                            let old_position = agent_location.position.to_point(&snapshot);
1701                            let new_position = location.position.to_point(&snapshot);
1702
1703                            // ignore this so that when we get updates from the edit tool
1704                            // the position doesn't reset to the startof line
1705                            old_position.row == new_position.row
1706                                && old_position.column > new_position.column
1707                        } else {
1708                            false
1709                        };
1710                        if !should_ignore {
1711                            project.set_agent_location(Some(location.into()), cx);
1712                        }
1713                    });
1714                }
1715
1716                let resolved_locations = resolved_locations
1717                    .iter()
1718                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1719                    .collect::<Vec<_>>();
1720
1721                if tool_call.resolved_locations != resolved_locations {
1722                    tool_call.resolved_locations = resolved_locations;
1723                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1724                }
1725            })
1726        })
1727        .detach();
1728    }
1729
1730    pub fn request_tool_call_authorization(
1731        &mut self,
1732        tool_call: acp::ToolCallUpdate,
1733        options: PermissionOptions,
1734        cx: &mut Context<Self>,
1735    ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1736        let (tx, rx) = oneshot::channel();
1737
1738        let status = ToolCallStatus::WaitingForConfirmation {
1739            options,
1740            respond_tx: tx,
1741        };
1742
1743        self.upsert_tool_call_inner(tool_call, status, cx)?;
1744        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1745
1746        let fut = async {
1747            match rx.await {
1748                Ok(option) => acp::RequestPermissionOutcome::Selected(
1749                    acp::SelectedPermissionOutcome::new(option),
1750                ),
1751                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1752            }
1753        }
1754        .boxed();
1755
1756        Ok(fut)
1757    }
1758
1759    pub fn authorize_tool_call(
1760        &mut self,
1761        id: acp::ToolCallId,
1762        option_id: acp::PermissionOptionId,
1763        option_kind: acp::PermissionOptionKind,
1764        cx: &mut Context<Self>,
1765    ) {
1766        let Some((ix, call)) = self.tool_call_mut(&id) else {
1767            return;
1768        };
1769
1770        let new_status = match option_kind {
1771            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1772                ToolCallStatus::Rejected
1773            }
1774            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1775                ToolCallStatus::InProgress
1776            }
1777            _ => ToolCallStatus::InProgress,
1778        };
1779
1780        let curr_status = mem::replace(&mut call.status, new_status);
1781
1782        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1783            respond_tx.send(option_id).log_err();
1784        } else if cfg!(debug_assertions) {
1785            panic!("tried to authorize an already authorized tool call");
1786        }
1787
1788        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1789    }
1790
1791    pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1792        let mut first_tool_call = None;
1793
1794        for entry in self.entries.iter().rev() {
1795            match &entry {
1796                AgentThreadEntry::ToolCall(call) => {
1797                    if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1798                        first_tool_call = Some(call);
1799                    } else {
1800                        continue;
1801                    }
1802                }
1803                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1804                    // Reached the beginning of the turn.
1805                    // If we had pending permission requests in the previous turn, they have been cancelled.
1806                    break;
1807                }
1808            }
1809        }
1810
1811        first_tool_call
1812    }
1813
1814    pub fn plan(&self) -> &Plan {
1815        &self.plan
1816    }
1817
1818    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1819        let new_entries_len = request.entries.len();
1820        let mut new_entries = request.entries.into_iter();
1821
1822        // Reuse existing markdown to prevent flickering
1823        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1824            let PlanEntry {
1825                content,
1826                priority,
1827                status,
1828            } = old;
1829            content.update(cx, |old, cx| {
1830                old.replace(new.content, cx);
1831            });
1832            *priority = new.priority;
1833            *status = new.status;
1834        }
1835        for new in new_entries {
1836            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1837        }
1838        self.plan.entries.truncate(new_entries_len);
1839
1840        cx.notify();
1841    }
1842
1843    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1844        self.plan
1845            .entries
1846            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1847        cx.notify();
1848    }
1849
1850    #[cfg(any(test, feature = "test-support"))]
1851    pub fn send_raw(
1852        &mut self,
1853        message: &str,
1854        cx: &mut Context<Self>,
1855    ) -> BoxFuture<'static, Result<()>> {
1856        self.send(vec![message.into()], cx)
1857    }
1858
1859    pub fn send(
1860        &mut self,
1861        message: Vec<acp::ContentBlock>,
1862        cx: &mut Context<Self>,
1863    ) -> BoxFuture<'static, Result<()>> {
1864        let block = ContentBlock::new_combined(
1865            message.clone(),
1866            self.project.read(cx).languages().clone(),
1867            self.project.read(cx).path_style(cx),
1868            cx,
1869        );
1870        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1871        let git_store = self.project.read(cx).git_store().clone();
1872
1873        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1874            Some(UserMessageId::new())
1875        } else {
1876            None
1877        };
1878
1879        self.run_turn(cx, async move |this, cx| {
1880            this.update(cx, |this, cx| {
1881                this.push_entry(
1882                    AgentThreadEntry::UserMessage(UserMessage {
1883                        id: message_id.clone(),
1884                        content: block,
1885                        chunks: message,
1886                        checkpoint: None,
1887                        indented: false,
1888                    }),
1889                    cx,
1890                );
1891            })
1892            .ok();
1893
1894            let old_checkpoint = git_store
1895                .update(cx, |git, cx| git.checkpoint(cx))
1896                .await
1897                .context("failed to get old checkpoint")
1898                .log_err();
1899            this.update(cx, |this, cx| {
1900                if let Some((_ix, message)) = this.last_user_message() {
1901                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1902                        git_checkpoint,
1903                        show: false,
1904                    });
1905                }
1906                this.connection.prompt(message_id, request, cx)
1907            })?
1908            .await
1909        })
1910    }
1911
1912    pub fn can_retry(&self, cx: &App) -> bool {
1913        self.connection.retry(&self.session_id, cx).is_some()
1914    }
1915
1916    pub fn retry(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1917        self.run_turn(cx, async move |this, cx| {
1918            this.update(cx, |this, cx| {
1919                this.connection
1920                    .retry(&this.session_id, cx)
1921                    .map(|retry| retry.run(cx))
1922            })?
1923            .context("retrying a session is not supported")?
1924            .await
1925        })
1926    }
1927
1928    fn run_turn(
1929        &mut self,
1930        cx: &mut Context<Self>,
1931        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1932    ) -> BoxFuture<'static, Result<()>> {
1933        self.clear_completed_plan_entries(cx);
1934
1935        let (tx, rx) = oneshot::channel();
1936        let cancel_task = self.cancel(cx);
1937
1938        self.send_task = Some(cx.spawn(async move |this, cx| {
1939            cancel_task.await;
1940            tx.send(f(this, cx).await).ok();
1941        }));
1942
1943        cx.spawn(async move |this, cx| {
1944            let response = rx.await;
1945
1946            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1947                .await?;
1948
1949            this.update(cx, |this, cx| {
1950                this.project
1951                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1952                match response {
1953                    Ok(Err(e)) => {
1954                        this.send_task.take();
1955                        cx.emit(AcpThreadEvent::Error);
1956                        Err(e)
1957                    }
1958                    Ok(Ok(r)) if r.stop_reason == acp::StopReason::MaxTokens => {
1959                        this.send_task.take();
1960                        cx.emit(AcpThreadEvent::Error);
1961                        Err(anyhow!("Max tokens reached"))
1962                    }
1963                    result => {
1964                        let canceled = matches!(
1965                            result,
1966                            Ok(Ok(acp::PromptResponse {
1967                                stop_reason: acp::StopReason::Cancelled,
1968                                ..
1969                            }))
1970                        );
1971
1972                        // We only take the task if the current prompt wasn't canceled.
1973                        //
1974                        // This prompt may have been canceled because another one was sent
1975                        // while it was still generating. In these cases, dropping `send_task`
1976                        // would cause the next generation to be canceled.
1977                        if !canceled {
1978                            this.send_task.take();
1979                        }
1980
1981                        // Handle refusal - distinguish between user prompt and tool call refusals
1982                        if let Ok(Ok(acp::PromptResponse {
1983                            stop_reason: acp::StopReason::Refusal,
1984                            ..
1985                        })) = result
1986                        {
1987                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1988                                // Check if there's a completed tool call with results after the last user message
1989                                // This indicates the refusal is in response to tool output, not the user's prompt
1990                                let has_completed_tool_call_after_user_msg =
1991                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1992                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
1993                                            // Check if the tool call has completed and has output
1994                                            matches!(tool_call.status, ToolCallStatus::Completed)
1995                                                && tool_call.raw_output.is_some()
1996                                        } else {
1997                                            false
1998                                        }
1999                                    });
2000
2001                                if has_completed_tool_call_after_user_msg {
2002                                    // Refusal is due to tool output - don't truncate, just notify
2003                                    // The model refused based on what the tool returned
2004                                    cx.emit(AcpThreadEvent::Refusal);
2005                                } else {
2006                                    // User prompt was refused - truncate back to before the user message
2007                                    let range = user_msg_ix..this.entries.len();
2008                                    if range.start < range.end {
2009                                        this.entries.truncate(user_msg_ix);
2010                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2011                                    }
2012                                    cx.emit(AcpThreadEvent::Refusal);
2013                                }
2014                            } else {
2015                                // No user message found, treat as general refusal
2016                                cx.emit(AcpThreadEvent::Refusal);
2017                            }
2018                        }
2019
2020                        cx.emit(AcpThreadEvent::Stopped);
2021                        Ok(())
2022                    }
2023                }
2024            })?
2025        })
2026        .boxed()
2027    }
2028
2029    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2030        let Some(send_task) = self.send_task.take() else {
2031            return Task::ready(());
2032        };
2033
2034        for entry in self.entries.iter_mut() {
2035            if let AgentThreadEntry::ToolCall(call) = entry {
2036                let cancel = matches!(
2037                    call.status,
2038                    ToolCallStatus::Pending
2039                        | ToolCallStatus::WaitingForConfirmation { .. }
2040                        | ToolCallStatus::InProgress
2041                );
2042
2043                if cancel {
2044                    call.status = ToolCallStatus::Canceled;
2045                }
2046            }
2047        }
2048
2049        self.connection.cancel(&self.session_id, cx);
2050
2051        // Wait for the send task to complete
2052        cx.foreground_executor().spawn(send_task)
2053    }
2054
2055    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2056    pub fn restore_checkpoint(
2057        &mut self,
2058        id: UserMessageId,
2059        cx: &mut Context<Self>,
2060    ) -> Task<Result<()>> {
2061        let Some((_, message)) = self.user_message_mut(&id) else {
2062            return Task::ready(Err(anyhow!("message not found")));
2063        };
2064
2065        let checkpoint = message
2066            .checkpoint
2067            .as_ref()
2068            .map(|c| c.git_checkpoint.clone());
2069
2070        // Cancel any in-progress generation before restoring
2071        let cancel_task = self.cancel(cx);
2072        let rewind = self.rewind(id.clone(), cx);
2073        let git_store = self.project.read(cx).git_store().clone();
2074
2075        cx.spawn(async move |_, cx| {
2076            cancel_task.await;
2077            rewind.await?;
2078            if let Some(checkpoint) = checkpoint {
2079                git_store
2080                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2081                    .await?;
2082            }
2083
2084            Ok(())
2085        })
2086    }
2087
2088    /// Rewinds this thread to before the entry at `index`, removing it and all
2089    /// subsequent entries while rejecting any action_log changes made from that point.
2090    /// Unlike `restore_checkpoint`, this method does not restore from git.
2091    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2092        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2093            return Task::ready(Err(anyhow!("not supported")));
2094        };
2095
2096        let telemetry = ActionLogTelemetry::from(&*self);
2097        cx.spawn(async move |this, cx| {
2098            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2099            this.update(cx, |this, cx| {
2100                if let Some((ix, _)) = this.user_message_mut(&id) {
2101                    // Collect all terminals from entries that will be removed
2102                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2103                        .iter()
2104                        .flat_map(|entry| entry.terminals())
2105                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2106                        .collect();
2107
2108                    let range = ix..this.entries.len();
2109                    this.entries.truncate(ix);
2110                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2111
2112                    // Kill and remove the terminals
2113                    for terminal_id in terminals_to_remove {
2114                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2115                            terminal.update(cx, |terminal, cx| {
2116                                terminal.kill(cx);
2117                            });
2118                        }
2119                    }
2120                }
2121                this.action_log().update(cx, |action_log, cx| {
2122                    action_log.reject_all_edits(Some(telemetry), cx)
2123                })
2124            })?
2125            .await;
2126            Ok(())
2127        })
2128    }
2129
2130    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2131        let git_store = self.project.read(cx).git_store().clone();
2132
2133        let Some((_, message)) = self.last_user_message() else {
2134            return Task::ready(Ok(()));
2135        };
2136        let Some(user_message_id) = message.id.clone() else {
2137            return Task::ready(Ok(()));
2138        };
2139        let Some(checkpoint) = message.checkpoint.as_ref() else {
2140            return Task::ready(Ok(()));
2141        };
2142        let old_checkpoint = checkpoint.git_checkpoint.clone();
2143
2144        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2145        cx.spawn(async move |this, cx| {
2146            let Some(new_checkpoint) = new_checkpoint
2147                .await
2148                .context("failed to get new checkpoint")
2149                .log_err()
2150            else {
2151                return Ok(());
2152            };
2153
2154            let equal = git_store
2155                .update(cx, |git, cx| {
2156                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2157                })
2158                .await
2159                .unwrap_or(true);
2160
2161            this.update(cx, |this, cx| {
2162                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2163                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2164                        checkpoint.show = !equal;
2165                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2166                    }
2167                }
2168            })?;
2169
2170            Ok(())
2171        })
2172    }
2173
2174    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2175        self.entries
2176            .iter_mut()
2177            .enumerate()
2178            .rev()
2179            .find_map(|(ix, entry)| {
2180                if let AgentThreadEntry::UserMessage(message) = entry {
2181                    Some((ix, message))
2182                } else {
2183                    None
2184                }
2185            })
2186    }
2187
2188    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2189        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2190            if let AgentThreadEntry::UserMessage(message) = entry {
2191                if message.id.as_ref() == Some(id) {
2192                    Some((ix, message))
2193                } else {
2194                    None
2195                }
2196            } else {
2197                None
2198            }
2199        })
2200    }
2201
2202    pub fn read_text_file(
2203        &self,
2204        path: PathBuf,
2205        line: Option<u32>,
2206        limit: Option<u32>,
2207        reuse_shared_snapshot: bool,
2208        cx: &mut Context<Self>,
2209    ) -> Task<Result<String, acp::Error>> {
2210        // Args are 1-based, move to 0-based
2211        let line = line.unwrap_or_default().saturating_sub(1);
2212        let limit = limit.unwrap_or(u32::MAX);
2213        let project = self.project.clone();
2214        let action_log = self.action_log.clone();
2215        cx.spawn(async move |this, cx| {
2216            let load = project.update(cx, |project, cx| {
2217                let path = project
2218                    .project_path_for_absolute_path(&path, cx)
2219                    .ok_or_else(|| {
2220                        acp::Error::resource_not_found(Some(path.display().to_string()))
2221                    })?;
2222                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2223            })?;
2224
2225            let buffer = load.await?;
2226
2227            let snapshot = if reuse_shared_snapshot {
2228                this.read_with(cx, |this, _| {
2229                    this.shared_buffers.get(&buffer.clone()).cloned()
2230                })
2231                .log_err()
2232                .flatten()
2233            } else {
2234                None
2235            };
2236
2237            let snapshot = if let Some(snapshot) = snapshot {
2238                snapshot
2239            } else {
2240                action_log.update(cx, |action_log, cx| {
2241                    action_log.buffer_read(buffer.clone(), cx);
2242                });
2243
2244                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2245                this.update(cx, |this, _| {
2246                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2247                })?;
2248                snapshot
2249            };
2250
2251            let max_point = snapshot.max_point();
2252            let start_position = Point::new(line, 0);
2253
2254            if start_position > max_point {
2255                return Err(acp::Error::invalid_params().data(format!(
2256                    "Attempting to read beyond the end of the file, line {}:{}",
2257                    max_point.row + 1,
2258                    max_point.column
2259                )));
2260            }
2261
2262            let start = snapshot.anchor_before(start_position);
2263            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2264
2265            project.update(cx, |project, cx| {
2266                project.set_agent_location(
2267                    Some(AgentLocation {
2268                        buffer: buffer.downgrade(),
2269                        position: start,
2270                    }),
2271                    cx,
2272                );
2273            });
2274
2275            Ok(snapshot.text_for_range(start..end).collect::<String>())
2276        })
2277    }
2278
2279    pub fn write_text_file(
2280        &self,
2281        path: PathBuf,
2282        content: String,
2283        cx: &mut Context<Self>,
2284    ) -> Task<Result<()>> {
2285        let project = self.project.clone();
2286        let action_log = self.action_log.clone();
2287        cx.spawn(async move |this, cx| {
2288            let load = project.update(cx, |project, cx| {
2289                let path = project
2290                    .project_path_for_absolute_path(&path, cx)
2291                    .context("invalid path")?;
2292                anyhow::Ok(project.open_buffer(path, cx))
2293            });
2294            let buffer = load?.await?;
2295            let snapshot = this.update(cx, |this, cx| {
2296                this.shared_buffers
2297                    .get(&buffer)
2298                    .cloned()
2299                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2300            })?;
2301            let edits = cx
2302                .background_executor()
2303                .spawn(async move {
2304                    let old_text = snapshot.text();
2305                    text_diff(old_text.as_str(), &content)
2306                        .into_iter()
2307                        .map(|(range, replacement)| {
2308                            (
2309                                snapshot.anchor_after(range.start)
2310                                    ..snapshot.anchor_before(range.end),
2311                                replacement,
2312                            )
2313                        })
2314                        .collect::<Vec<_>>()
2315                })
2316                .await;
2317
2318            project.update(cx, |project, cx| {
2319                project.set_agent_location(
2320                    Some(AgentLocation {
2321                        buffer: buffer.downgrade(),
2322                        position: edits
2323                            .last()
2324                            .map(|(range, _)| range.end)
2325                            .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2326                    }),
2327                    cx,
2328                );
2329            });
2330
2331            let format_on_save = cx.update(|cx| {
2332                action_log.update(cx, |action_log, cx| {
2333                    action_log.buffer_read(buffer.clone(), cx);
2334                });
2335
2336                let format_on_save = buffer.update(cx, |buffer, cx| {
2337                    buffer.edit(edits, None, cx);
2338
2339                    let settings = language::language_settings::language_settings(
2340                        buffer.language().map(|l| l.name()),
2341                        buffer.file(),
2342                        cx,
2343                    );
2344
2345                    settings.format_on_save != FormatOnSave::Off
2346                });
2347                action_log.update(cx, |action_log, cx| {
2348                    action_log.buffer_edited(buffer.clone(), cx);
2349                });
2350                format_on_save
2351            });
2352
2353            if format_on_save {
2354                let format_task = project.update(cx, |project, cx| {
2355                    project.format(
2356                        HashSet::from_iter([buffer.clone()]),
2357                        LspFormatTarget::Buffers,
2358                        false,
2359                        FormatTrigger::Save,
2360                        cx,
2361                    )
2362                });
2363                format_task.await.log_err();
2364
2365                action_log.update(cx, |action_log, cx| {
2366                    action_log.buffer_edited(buffer.clone(), cx);
2367                });
2368            }
2369
2370            project
2371                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2372                .await
2373        })
2374    }
2375
2376    pub fn create_terminal(
2377        &self,
2378        command: String,
2379        args: Vec<String>,
2380        extra_env: Vec<acp::EnvVariable>,
2381        cwd: Option<PathBuf>,
2382        output_byte_limit: Option<u64>,
2383        cx: &mut Context<Self>,
2384    ) -> Task<Result<Entity<Terminal>>> {
2385        let env = match &cwd {
2386            Some(dir) => self.project.update(cx, |project, cx| {
2387                project.environment().update(cx, |env, cx| {
2388                    env.directory_environment(dir.as_path().into(), cx)
2389                })
2390            }),
2391            None => Task::ready(None).shared(),
2392        };
2393        let env = cx.spawn(async move |_, _| {
2394            let mut env = env.await.unwrap_or_default();
2395            // Disables paging for `git` and hopefully other commands
2396            env.insert("PAGER".into(), "".into());
2397            for var in extra_env {
2398                env.insert(var.name, var.value);
2399            }
2400            env
2401        });
2402
2403        let project = self.project.clone();
2404        let language_registry = project.read(cx).languages().clone();
2405        let is_windows = project.read(cx).path_style(cx).is_windows();
2406
2407        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2408        let terminal_task = cx.spawn({
2409            let terminal_id = terminal_id.clone();
2410            async move |_this, cx| {
2411                let env = env.await;
2412                let shell = project
2413                    .update(cx, |project, cx| {
2414                        project
2415                            .remote_client()
2416                            .and_then(|r| r.read(cx).default_system_shell())
2417                    })
2418                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2419                let (task_command, task_args) =
2420                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2421                        .redirect_stdin_to_dev_null()
2422                        .build(Some(command.clone()), &args);
2423                let terminal = project
2424                    .update(cx, |project, cx| {
2425                        project.create_terminal_task(
2426                            task::SpawnInTerminal {
2427                                command: Some(task_command),
2428                                args: task_args,
2429                                cwd: cwd.clone(),
2430                                env,
2431                                ..Default::default()
2432                            },
2433                            cx,
2434                        )
2435                    })
2436                    .await?;
2437
2438                anyhow::Ok(cx.new(|cx| {
2439                    Terminal::new(
2440                        terminal_id,
2441                        &format!("{} {}", command, args.join(" ")),
2442                        cwd,
2443                        output_byte_limit.map(|l| l as usize),
2444                        terminal,
2445                        language_registry,
2446                        cx,
2447                    )
2448                }))
2449            }
2450        });
2451
2452        cx.spawn(async move |this, cx| {
2453            let terminal = terminal_task.await?;
2454            this.update(cx, |this, _cx| {
2455                this.terminals.insert(terminal_id, terminal.clone());
2456                terminal
2457            })
2458        })
2459    }
2460
2461    pub fn kill_terminal(
2462        &mut self,
2463        terminal_id: acp::TerminalId,
2464        cx: &mut Context<Self>,
2465    ) -> Result<()> {
2466        self.terminals
2467            .get(&terminal_id)
2468            .context("Terminal not found")?
2469            .update(cx, |terminal, cx| {
2470                terminal.kill(cx);
2471            });
2472
2473        Ok(())
2474    }
2475
2476    pub fn release_terminal(
2477        &mut self,
2478        terminal_id: acp::TerminalId,
2479        cx: &mut Context<Self>,
2480    ) -> Result<()> {
2481        self.terminals
2482            .remove(&terminal_id)
2483            .context("Terminal not found")?
2484            .update(cx, |terminal, cx| {
2485                terminal.kill(cx);
2486            });
2487
2488        Ok(())
2489    }
2490
2491    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2492        self.terminals
2493            .get(&terminal_id)
2494            .context("Terminal not found")
2495            .cloned()
2496    }
2497
2498    pub fn to_markdown(&self, cx: &App) -> String {
2499        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2500    }
2501
2502    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2503        cx.emit(AcpThreadEvent::LoadError(error));
2504    }
2505
2506    pub fn register_terminal_created(
2507        &mut self,
2508        terminal_id: acp::TerminalId,
2509        command_label: String,
2510        working_dir: Option<PathBuf>,
2511        output_byte_limit: Option<u64>,
2512        terminal: Entity<::terminal::Terminal>,
2513        cx: &mut Context<Self>,
2514    ) -> Entity<Terminal> {
2515        let language_registry = self.project.read(cx).languages().clone();
2516
2517        let entity = cx.new(|cx| {
2518            Terminal::new(
2519                terminal_id.clone(),
2520                &command_label,
2521                working_dir.clone(),
2522                output_byte_limit.map(|l| l as usize),
2523                terminal,
2524                language_registry,
2525                cx,
2526            )
2527        });
2528        self.terminals.insert(terminal_id.clone(), entity.clone());
2529        entity
2530    }
2531}
2532
2533fn markdown_for_raw_output(
2534    raw_output: &serde_json::Value,
2535    language_registry: &Arc<LanguageRegistry>,
2536    cx: &mut App,
2537) -> Option<Entity<Markdown>> {
2538    match raw_output {
2539        serde_json::Value::Null => None,
2540        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2541            Markdown::new(
2542                value.to_string().into(),
2543                Some(language_registry.clone()),
2544                None,
2545                cx,
2546            )
2547        })),
2548        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2549            Markdown::new(
2550                value.to_string().into(),
2551                Some(language_registry.clone()),
2552                None,
2553                cx,
2554            )
2555        })),
2556        serde_json::Value::String(value) => Some(cx.new(|cx| {
2557            Markdown::new(
2558                value.clone().into(),
2559                Some(language_registry.clone()),
2560                None,
2561                cx,
2562            )
2563        })),
2564        value => Some(cx.new(|cx| {
2565            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2566
2567            Markdown::new(
2568                format!("```json\n{}\n```", pretty_json).into(),
2569                Some(language_registry.clone()),
2570                None,
2571                cx,
2572            )
2573        })),
2574    }
2575}
2576
2577#[cfg(test)]
2578mod tests {
2579    use super::*;
2580    use anyhow::anyhow;
2581    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2582    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2583    use indoc::indoc;
2584    use project::{FakeFs, Fs};
2585    use rand::{distr, prelude::*};
2586    use serde_json::json;
2587    use settings::SettingsStore;
2588    use smol::stream::StreamExt as _;
2589    use std::{
2590        any::Any,
2591        cell::RefCell,
2592        path::Path,
2593        rc::Rc,
2594        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2595        time::Duration,
2596    };
2597    use util::path;
2598
2599    fn init_test(cx: &mut TestAppContext) {
2600        env_logger::try_init().ok();
2601        cx.update(|cx| {
2602            let settings_store = SettingsStore::test(cx);
2603            cx.set_global(settings_store);
2604        });
2605    }
2606
2607    #[gpui::test]
2608    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2609        init_test(cx);
2610
2611        let fs = FakeFs::new(cx.executor());
2612        let project = Project::test(fs, [], cx).await;
2613        let connection = Rc::new(FakeAgentConnection::new());
2614        let thread = cx
2615            .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2616            .await
2617            .unwrap();
2618
2619        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2620
2621        // Send Output BEFORE Created - should be buffered by acp_thread
2622        thread.update(cx, |thread, cx| {
2623            thread.on_terminal_provider_event(
2624                TerminalProviderEvent::Output {
2625                    terminal_id: terminal_id.clone(),
2626                    data: b"hello buffered".to_vec(),
2627                },
2628                cx,
2629            );
2630        });
2631
2632        // Create a display-only terminal and then send Created
2633        let lower = cx.new(|cx| {
2634            let builder = ::terminal::TerminalBuilder::new_display_only(
2635                ::terminal::terminal_settings::CursorShape::default(),
2636                ::terminal::terminal_settings::AlternateScroll::On,
2637                None,
2638                0,
2639                cx.background_executor(),
2640                PathStyle::local(),
2641            )
2642            .unwrap();
2643            builder.subscribe(cx)
2644        });
2645
2646        thread.update(cx, |thread, cx| {
2647            thread.on_terminal_provider_event(
2648                TerminalProviderEvent::Created {
2649                    terminal_id: terminal_id.clone(),
2650                    label: "Buffered Test".to_string(),
2651                    cwd: None,
2652                    output_byte_limit: None,
2653                    terminal: lower.clone(),
2654                },
2655                cx,
2656            );
2657        });
2658
2659        // After Created, buffered Output should have been flushed into the renderer
2660        let content = thread.read_with(cx, |thread, cx| {
2661            let term = thread.terminal(terminal_id.clone()).unwrap();
2662            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2663        });
2664
2665        assert!(
2666            content.contains("hello buffered"),
2667            "expected buffered output to render, got: {content}"
2668        );
2669    }
2670
2671    #[gpui::test]
2672    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2673        init_test(cx);
2674
2675        let fs = FakeFs::new(cx.executor());
2676        let project = Project::test(fs, [], cx).await;
2677        let connection = Rc::new(FakeAgentConnection::new());
2678        let thread = cx
2679            .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2680            .await
2681            .unwrap();
2682
2683        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2684
2685        // Send Output BEFORE Created
2686        thread.update(cx, |thread, cx| {
2687            thread.on_terminal_provider_event(
2688                TerminalProviderEvent::Output {
2689                    terminal_id: terminal_id.clone(),
2690                    data: b"pre-exit data".to_vec(),
2691                },
2692                cx,
2693            );
2694        });
2695
2696        // Send Exit BEFORE Created
2697        thread.update(cx, |thread, cx| {
2698            thread.on_terminal_provider_event(
2699                TerminalProviderEvent::Exit {
2700                    terminal_id: terminal_id.clone(),
2701                    status: acp::TerminalExitStatus::new().exit_code(0),
2702                },
2703                cx,
2704            );
2705        });
2706
2707        // Now create a display-only lower-level terminal and send Created
2708        let lower = cx.new(|cx| {
2709            let builder = ::terminal::TerminalBuilder::new_display_only(
2710                ::terminal::terminal_settings::CursorShape::default(),
2711                ::terminal::terminal_settings::AlternateScroll::On,
2712                None,
2713                0,
2714                cx.background_executor(),
2715                PathStyle::local(),
2716            )
2717            .unwrap();
2718            builder.subscribe(cx)
2719        });
2720
2721        thread.update(cx, |thread, cx| {
2722            thread.on_terminal_provider_event(
2723                TerminalProviderEvent::Created {
2724                    terminal_id: terminal_id.clone(),
2725                    label: "Buffered Exit Test".to_string(),
2726                    cwd: None,
2727                    output_byte_limit: None,
2728                    terminal: lower.clone(),
2729                },
2730                cx,
2731            );
2732        });
2733
2734        // Output should be present after Created (flushed from buffer)
2735        let content = thread.read_with(cx, |thread, cx| {
2736            let term = thread.terminal(terminal_id.clone()).unwrap();
2737            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2738        });
2739
2740        assert!(
2741            content.contains("pre-exit data"),
2742            "expected pre-exit data to render, got: {content}"
2743        );
2744    }
2745
2746    /// Test that killing a terminal via Terminal::kill properly:
2747    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2748    /// 2. The underlying terminal still has the output that was written before the kill
2749    ///
2750    /// This test verifies that the fix to kill_active_task (which now also kills
2751    /// the shell process in addition to the foreground process) properly allows
2752    /// wait_for_exit to complete instead of hanging indefinitely.
2753    #[cfg(unix)]
2754    #[gpui::test]
2755    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2756        use std::collections::HashMap;
2757        use task::Shell;
2758        use util::shell_builder::ShellBuilder;
2759
2760        init_test(cx);
2761        cx.executor().allow_parking();
2762
2763        let fs = FakeFs::new(cx.executor());
2764        let project = Project::test(fs, [], cx).await;
2765        let connection = Rc::new(FakeAgentConnection::new());
2766        let thread = cx
2767            .update(|cx| connection.new_session(project.clone(), Path::new(path!("/test")), cx))
2768            .await
2769            .unwrap();
2770
2771        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2772
2773        // Create a real PTY terminal that runs a command which prints output then sleeps
2774        // We use printf instead of echo and chain with && sleep to ensure proper execution
2775        let (completion_tx, _completion_rx) = smol::channel::unbounded();
2776        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2777            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2778            &[],
2779        );
2780
2781        let builder = cx
2782            .update(|cx| {
2783                ::terminal::TerminalBuilder::new(
2784                    None,
2785                    None,
2786                    task::Shell::WithArguments {
2787                        program,
2788                        args,
2789                        title_override: None,
2790                    },
2791                    HashMap::default(),
2792                    ::terminal::terminal_settings::CursorShape::default(),
2793                    ::terminal::terminal_settings::AlternateScroll::On,
2794                    None,
2795                    vec![],
2796                    0,
2797                    false,
2798                    0,
2799                    Some(completion_tx),
2800                    cx,
2801                    vec![],
2802                    PathStyle::local(),
2803                )
2804            })
2805            .await
2806            .unwrap();
2807
2808        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2809
2810        // Create the acp_thread Terminal wrapper
2811        thread.update(cx, |thread, cx| {
2812            thread.on_terminal_provider_event(
2813                TerminalProviderEvent::Created {
2814                    terminal_id: terminal_id.clone(),
2815                    label: "printf output_before_kill && sleep 60".to_string(),
2816                    cwd: None,
2817                    output_byte_limit: None,
2818                    terminal: lower_terminal.clone(),
2819                },
2820                cx,
2821            );
2822        });
2823
2824        // Wait for the printf command to execute and produce output
2825        // Use real time since parking is enabled
2826        cx.executor().timer(Duration::from_millis(500)).await;
2827
2828        // Get the acp_thread Terminal and kill it
2829        let wait_for_exit = thread.update(cx, |thread, cx| {
2830            let term = thread.terminals.get(&terminal_id).unwrap();
2831            let wait_for_exit = term.read(cx).wait_for_exit();
2832            term.update(cx, |term, cx| {
2833                term.kill(cx);
2834            });
2835            wait_for_exit
2836        });
2837
2838        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2839        // Before the fix to kill_active_task, this would hang forever because
2840        // only the foreground process was killed, not the shell, so the PTY
2841        // child never exited and wait_for_completed_task never completed.
2842        let exit_result = futures::select! {
2843            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2844            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2845        };
2846
2847        assert!(
2848            exit_result.is_some(),
2849            "wait_for_exit should complete after kill, but it timed out. \
2850            This indicates kill_active_task is not properly killing the shell process."
2851        );
2852
2853        // Give the system a chance to process any pending updates
2854        cx.run_until_parked();
2855
2856        // Verify that the underlying terminal still has the output that was
2857        // written before the kill. This verifies that killing doesn't lose output.
2858        let inner_content = thread.read_with(cx, |thread, cx| {
2859            let term = thread.terminals.get(&terminal_id).unwrap();
2860            term.read(cx).inner().read(cx).get_content()
2861        });
2862
2863        assert!(
2864            inner_content.contains("output_before_kill"),
2865            "Underlying terminal should contain output from before kill, got: {}",
2866            inner_content
2867        );
2868    }
2869
2870    #[gpui::test]
2871    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2872        init_test(cx);
2873
2874        let fs = FakeFs::new(cx.executor());
2875        let project = Project::test(fs, [], cx).await;
2876        let connection = Rc::new(FakeAgentConnection::new());
2877        let thread = cx
2878            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2879            .await
2880            .unwrap();
2881
2882        // Test creating a new user message
2883        thread.update(cx, |thread, cx| {
2884            thread.push_user_content_block(None, "Hello, ".into(), cx);
2885        });
2886
2887        thread.update(cx, |thread, cx| {
2888            assert_eq!(thread.entries.len(), 1);
2889            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2890                assert_eq!(user_msg.id, None);
2891                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2892            } else {
2893                panic!("Expected UserMessage");
2894            }
2895        });
2896
2897        // Test appending to existing user message
2898        let message_1_id = UserMessageId::new();
2899        thread.update(cx, |thread, cx| {
2900            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2901        });
2902
2903        thread.update(cx, |thread, cx| {
2904            assert_eq!(thread.entries.len(), 1);
2905            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2906                assert_eq!(user_msg.id, Some(message_1_id));
2907                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2908            } else {
2909                panic!("Expected UserMessage");
2910            }
2911        });
2912
2913        // Test creating new user message after assistant message
2914        thread.update(cx, |thread, cx| {
2915            thread.push_assistant_content_block("Assistant response".into(), false, cx);
2916        });
2917
2918        let message_2_id = UserMessageId::new();
2919        thread.update(cx, |thread, cx| {
2920            thread.push_user_content_block(
2921                Some(message_2_id.clone()),
2922                "New user message".into(),
2923                cx,
2924            );
2925        });
2926
2927        thread.update(cx, |thread, cx| {
2928            assert_eq!(thread.entries.len(), 3);
2929            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2930                assert_eq!(user_msg.id, Some(message_2_id));
2931                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2932            } else {
2933                panic!("Expected UserMessage at index 2");
2934            }
2935        });
2936    }
2937
2938    #[gpui::test]
2939    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2940        init_test(cx);
2941
2942        let fs = FakeFs::new(cx.executor());
2943        let project = Project::test(fs, [], cx).await;
2944        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2945            |_, thread, mut cx| {
2946                async move {
2947                    thread.update(&mut cx, |thread, cx| {
2948                        thread
2949                            .handle_session_update(
2950                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2951                                    "Thinking ".into(),
2952                                )),
2953                                cx,
2954                            )
2955                            .unwrap();
2956                        thread
2957                            .handle_session_update(
2958                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2959                                    "hard!".into(),
2960                                )),
2961                                cx,
2962                            )
2963                            .unwrap();
2964                    })?;
2965                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2966                }
2967                .boxed_local()
2968            },
2969        ));
2970
2971        let thread = cx
2972            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2973            .await
2974            .unwrap();
2975
2976        thread
2977            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2978            .await
2979            .unwrap();
2980
2981        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2982        assert_eq!(
2983            output,
2984            indoc! {r#"
2985            ## User
2986
2987            Hello from Zed!
2988
2989            ## Assistant
2990
2991            <thinking>
2992            Thinking hard!
2993            </thinking>
2994
2995            "#}
2996        );
2997    }
2998
2999    #[gpui::test]
3000    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3001        init_test(cx);
3002
3003        let fs = FakeFs::new(cx.executor());
3004        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3005            .await;
3006        let project = Project::test(fs.clone(), [], cx).await;
3007        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3008        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3009        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3010            move |_, thread, mut cx| {
3011                let read_file_tx = read_file_tx.clone();
3012                async move {
3013                    let content = thread
3014                        .update(&mut cx, |thread, cx| {
3015                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3016                        })
3017                        .unwrap()
3018                        .await
3019                        .unwrap();
3020                    assert_eq!(content, "one\ntwo\nthree\n");
3021                    read_file_tx.take().unwrap().send(()).unwrap();
3022                    thread
3023                        .update(&mut cx, |thread, cx| {
3024                            thread.write_text_file(
3025                                path!("/tmp/foo").into(),
3026                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3027                                cx,
3028                            )
3029                        })
3030                        .unwrap()
3031                        .await
3032                        .unwrap();
3033                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3034                }
3035                .boxed_local()
3036            },
3037        ));
3038
3039        let (worktree, pathbuf) = project
3040            .update(cx, |project, cx| {
3041                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3042            })
3043            .await
3044            .unwrap();
3045        let buffer = project
3046            .update(cx, |project, cx| {
3047                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3048            })
3049            .await
3050            .unwrap();
3051
3052        let thread = cx
3053            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3054            .await
3055            .unwrap();
3056
3057        let request = thread.update(cx, |thread, cx| {
3058            thread.send_raw("Extend the count in /tmp/foo", cx)
3059        });
3060        read_file_rx.await.ok();
3061        buffer.update(cx, |buffer, cx| {
3062            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3063        });
3064        cx.run_until_parked();
3065        assert_eq!(
3066            buffer.read_with(cx, |buffer, _| buffer.text()),
3067            "zero\none\ntwo\nthree\nfour\nfive\n"
3068        );
3069        assert_eq!(
3070            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3071            "zero\none\ntwo\nthree\nfour\nfive\n"
3072        );
3073        request.await.unwrap();
3074    }
3075
3076    #[gpui::test]
3077    async fn test_reading_from_line(cx: &mut TestAppContext) {
3078        init_test(cx);
3079
3080        let fs = FakeFs::new(cx.executor());
3081        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3082            .await;
3083        let project = Project::test(fs.clone(), [], cx).await;
3084        project
3085            .update(cx, |project, cx| {
3086                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3087            })
3088            .await
3089            .unwrap();
3090
3091        let connection = Rc::new(FakeAgentConnection::new());
3092
3093        let thread = cx
3094            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3095            .await
3096            .unwrap();
3097
3098        // Whole file
3099        let content = thread
3100            .update(cx, |thread, cx| {
3101                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3102            })
3103            .await
3104            .unwrap();
3105
3106        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3107
3108        // Only start line
3109        let content = thread
3110            .update(cx, |thread, cx| {
3111                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3112            })
3113            .await
3114            .unwrap();
3115
3116        assert_eq!(content, "three\nfour\n");
3117
3118        // Only limit
3119        let content = thread
3120            .update(cx, |thread, cx| {
3121                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3122            })
3123            .await
3124            .unwrap();
3125
3126        assert_eq!(content, "one\ntwo\n");
3127
3128        // Range
3129        let content = thread
3130            .update(cx, |thread, cx| {
3131                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3132            })
3133            .await
3134            .unwrap();
3135
3136        assert_eq!(content, "two\nthree\n");
3137
3138        // Invalid
3139        let err = thread
3140            .update(cx, |thread, cx| {
3141                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3142            })
3143            .await
3144            .unwrap_err();
3145
3146        assert_eq!(
3147            err.to_string(),
3148            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3149        );
3150    }
3151
3152    #[gpui::test]
3153    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3154        init_test(cx);
3155
3156        let fs = FakeFs::new(cx.executor());
3157        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3158        let project = Project::test(fs.clone(), [], cx).await;
3159        project
3160            .update(cx, |project, cx| {
3161                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3162            })
3163            .await
3164            .unwrap();
3165
3166        let connection = Rc::new(FakeAgentConnection::new());
3167
3168        let thread = cx
3169            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3170            .await
3171            .unwrap();
3172
3173        // Whole file
3174        let content = thread
3175            .update(cx, |thread, cx| {
3176                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3177            })
3178            .await
3179            .unwrap();
3180
3181        assert_eq!(content, "");
3182
3183        // Only start line
3184        let content = thread
3185            .update(cx, |thread, cx| {
3186                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3187            })
3188            .await
3189            .unwrap();
3190
3191        assert_eq!(content, "");
3192
3193        // Only limit
3194        let content = thread
3195            .update(cx, |thread, cx| {
3196                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3197            })
3198            .await
3199            .unwrap();
3200
3201        assert_eq!(content, "");
3202
3203        // Range
3204        let content = thread
3205            .update(cx, |thread, cx| {
3206                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3207            })
3208            .await
3209            .unwrap();
3210
3211        assert_eq!(content, "");
3212
3213        // Invalid
3214        let err = thread
3215            .update(cx, |thread, cx| {
3216                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3217            })
3218            .await
3219            .unwrap_err();
3220
3221        assert_eq!(
3222            err.to_string(),
3223            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3224        );
3225    }
3226    #[gpui::test]
3227    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3228        init_test(cx);
3229
3230        let fs = FakeFs::new(cx.executor());
3231        fs.insert_tree(path!("/tmp"), json!({})).await;
3232        let project = Project::test(fs.clone(), [], cx).await;
3233        project
3234            .update(cx, |project, cx| {
3235                project.find_or_create_worktree(path!("/tmp"), true, cx)
3236            })
3237            .await
3238            .unwrap();
3239
3240        let connection = Rc::new(FakeAgentConnection::new());
3241
3242        let thread = cx
3243            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3244            .await
3245            .unwrap();
3246
3247        // Out of project file
3248        let err = thread
3249            .update(cx, |thread, cx| {
3250                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3251            })
3252            .await
3253            .unwrap_err();
3254
3255        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3256    }
3257
3258    #[gpui::test]
3259    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3260        init_test(cx);
3261
3262        let fs = FakeFs::new(cx.executor());
3263        let project = Project::test(fs, [], cx).await;
3264        let id = acp::ToolCallId::new("test");
3265
3266        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3267            let id = id.clone();
3268            move |_, thread, mut cx| {
3269                let id = id.clone();
3270                async move {
3271                    thread
3272                        .update(&mut cx, |thread, cx| {
3273                            thread.handle_session_update(
3274                                acp::SessionUpdate::ToolCall(
3275                                    acp::ToolCall::new(id.clone(), "Label")
3276                                        .kind(acp::ToolKind::Fetch)
3277                                        .status(acp::ToolCallStatus::InProgress),
3278                                ),
3279                                cx,
3280                            )
3281                        })
3282                        .unwrap()
3283                        .unwrap();
3284                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3285                }
3286                .boxed_local()
3287            }
3288        }));
3289
3290        let thread = cx
3291            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3292            .await
3293            .unwrap();
3294
3295        let request = thread.update(cx, |thread, cx| {
3296            thread.send_raw("Fetch https://example.com", cx)
3297        });
3298
3299        run_until_first_tool_call(&thread, cx).await;
3300
3301        thread.read_with(cx, |thread, _| {
3302            assert!(matches!(
3303                thread.entries[1],
3304                AgentThreadEntry::ToolCall(ToolCall {
3305                    status: ToolCallStatus::InProgress,
3306                    ..
3307                })
3308            ));
3309        });
3310
3311        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3312
3313        thread.read_with(cx, |thread, _| {
3314            assert!(matches!(
3315                &thread.entries[1],
3316                AgentThreadEntry::ToolCall(ToolCall {
3317                    status: ToolCallStatus::Canceled,
3318                    ..
3319                })
3320            ));
3321        });
3322
3323        thread
3324            .update(cx, |thread, cx| {
3325                thread.handle_session_update(
3326                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3327                        id,
3328                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3329                    )),
3330                    cx,
3331                )
3332            })
3333            .unwrap();
3334
3335        request.await.unwrap();
3336
3337        thread.read_with(cx, |thread, _| {
3338            assert!(matches!(
3339                thread.entries[1],
3340                AgentThreadEntry::ToolCall(ToolCall {
3341                    status: ToolCallStatus::Completed,
3342                    ..
3343                })
3344            ));
3345        });
3346    }
3347
3348    #[gpui::test]
3349    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3350        init_test(cx);
3351        let fs = FakeFs::new(cx.background_executor.clone());
3352        fs.insert_tree(path!("/test"), json!({})).await;
3353        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3354
3355        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3356            move |_, thread, mut cx| {
3357                async move {
3358                    thread
3359                        .update(&mut cx, |thread, cx| {
3360                            thread.handle_session_update(
3361                                acp::SessionUpdate::ToolCall(
3362                                    acp::ToolCall::new("test", "Label")
3363                                        .kind(acp::ToolKind::Edit)
3364                                        .status(acp::ToolCallStatus::Completed)
3365                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3366                                            "/test/test.txt",
3367                                            "foo",
3368                                        ))]),
3369                                ),
3370                                cx,
3371                            )
3372                        })
3373                        .unwrap()
3374                        .unwrap();
3375                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3376                }
3377                .boxed_local()
3378            }
3379        }));
3380
3381        let thread = cx
3382            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3383            .await
3384            .unwrap();
3385
3386        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3387            .await
3388            .unwrap();
3389
3390        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3391    }
3392
3393    #[gpui::test(iterations = 10)]
3394    async fn test_checkpoints(cx: &mut TestAppContext) {
3395        init_test(cx);
3396        let fs = FakeFs::new(cx.background_executor.clone());
3397        fs.insert_tree(
3398            path!("/test"),
3399            json!({
3400                ".git": {}
3401            }),
3402        )
3403        .await;
3404        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3405
3406        let simulate_changes = Arc::new(AtomicBool::new(true));
3407        let next_filename = Arc::new(AtomicUsize::new(0));
3408        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3409            let simulate_changes = simulate_changes.clone();
3410            let next_filename = next_filename.clone();
3411            let fs = fs.clone();
3412            move |request, thread, mut cx| {
3413                let fs = fs.clone();
3414                let simulate_changes = simulate_changes.clone();
3415                let next_filename = next_filename.clone();
3416                async move {
3417                    if simulate_changes.load(SeqCst) {
3418                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3419                        fs.write(Path::new(&filename), b"").await?;
3420                    }
3421
3422                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3423                        panic!("expected text content block");
3424                    };
3425                    thread.update(&mut cx, |thread, cx| {
3426                        thread
3427                            .handle_session_update(
3428                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3429                                    content.text.to_uppercase().into(),
3430                                )),
3431                                cx,
3432                            )
3433                            .unwrap();
3434                    })?;
3435                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3436                }
3437                .boxed_local()
3438            }
3439        }));
3440        let thread = cx
3441            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3442            .await
3443            .unwrap();
3444
3445        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3446            .await
3447            .unwrap();
3448        thread.read_with(cx, |thread, cx| {
3449            assert_eq!(
3450                thread.to_markdown(cx),
3451                indoc! {"
3452                    ## User (checkpoint)
3453
3454                    Lorem
3455
3456                    ## Assistant
3457
3458                    LOREM
3459
3460                "}
3461            );
3462        });
3463        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3464
3465        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3466            .await
3467            .unwrap();
3468        thread.read_with(cx, |thread, cx| {
3469            assert_eq!(
3470                thread.to_markdown(cx),
3471                indoc! {"
3472                    ## User (checkpoint)
3473
3474                    Lorem
3475
3476                    ## Assistant
3477
3478                    LOREM
3479
3480                    ## User (checkpoint)
3481
3482                    ipsum
3483
3484                    ## Assistant
3485
3486                    IPSUM
3487
3488                "}
3489            );
3490        });
3491        assert_eq!(
3492            fs.files(),
3493            vec![
3494                Path::new(path!("/test/file-0")),
3495                Path::new(path!("/test/file-1"))
3496            ]
3497        );
3498
3499        // Checkpoint isn't stored when there are no changes.
3500        simulate_changes.store(false, SeqCst);
3501        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3502            .await
3503            .unwrap();
3504        thread.read_with(cx, |thread, cx| {
3505            assert_eq!(
3506                thread.to_markdown(cx),
3507                indoc! {"
3508                    ## User (checkpoint)
3509
3510                    Lorem
3511
3512                    ## Assistant
3513
3514                    LOREM
3515
3516                    ## User (checkpoint)
3517
3518                    ipsum
3519
3520                    ## Assistant
3521
3522                    IPSUM
3523
3524                    ## User
3525
3526                    dolor
3527
3528                    ## Assistant
3529
3530                    DOLOR
3531
3532                "}
3533            );
3534        });
3535        assert_eq!(
3536            fs.files(),
3537            vec![
3538                Path::new(path!("/test/file-0")),
3539                Path::new(path!("/test/file-1"))
3540            ]
3541        );
3542
3543        // Rewinding the conversation truncates the history and restores the checkpoint.
3544        thread
3545            .update(cx, |thread, cx| {
3546                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3547                    panic!("unexpected entries {:?}", thread.entries)
3548                };
3549                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3550            })
3551            .await
3552            .unwrap();
3553        thread.read_with(cx, |thread, cx| {
3554            assert_eq!(
3555                thread.to_markdown(cx),
3556                indoc! {"
3557                    ## User (checkpoint)
3558
3559                    Lorem
3560
3561                    ## Assistant
3562
3563                    LOREM
3564
3565                "}
3566            );
3567        });
3568        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3569    }
3570
3571    #[gpui::test]
3572    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3573        use std::sync::atomic::AtomicUsize;
3574        init_test(cx);
3575
3576        let fs = FakeFs::new(cx.executor());
3577        let project = Project::test(fs, None, cx).await;
3578
3579        // Create a connection that simulates refusal after tool result
3580        let prompt_count = Arc::new(AtomicUsize::new(0));
3581        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3582            let prompt_count = prompt_count.clone();
3583            move |_request, thread, mut cx| {
3584                let count = prompt_count.fetch_add(1, SeqCst);
3585                async move {
3586                    if count == 0 {
3587                        // First prompt: Generate a tool call with result
3588                        thread.update(&mut cx, |thread, cx| {
3589                            thread
3590                                .handle_session_update(
3591                                    acp::SessionUpdate::ToolCall(
3592                                        acp::ToolCall::new("tool1", "Test Tool")
3593                                            .kind(acp::ToolKind::Fetch)
3594                                            .status(acp::ToolCallStatus::Completed)
3595                                            .raw_input(serde_json::json!({"query": "test"}))
3596                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
3597                                    ),
3598                                    cx,
3599                                )
3600                                .unwrap();
3601                        })?;
3602
3603                        // Now return refusal because of the tool result
3604                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3605                    } else {
3606                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3607                    }
3608                }
3609                .boxed_local()
3610            }
3611        }));
3612
3613        let thread = cx
3614            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3615            .await
3616            .unwrap();
3617
3618        // Track if we see a Refusal event
3619        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3620        let saw_refusal_event_captured = saw_refusal_event.clone();
3621        thread.update(cx, |_thread, cx| {
3622            cx.subscribe(
3623                &thread,
3624                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3625                    if matches!(event, AcpThreadEvent::Refusal) {
3626                        *saw_refusal_event_captured.lock().unwrap() = true;
3627                    }
3628                },
3629            )
3630            .detach();
3631        });
3632
3633        // Send a user message - this will trigger tool call and then refusal
3634        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3635        cx.background_executor.spawn(send_task).detach();
3636        cx.run_until_parked();
3637
3638        // Verify that:
3639        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3640        // 2. The user message was NOT truncated
3641        assert!(
3642            *saw_refusal_event.lock().unwrap(),
3643            "Refusal event should be emitted for tool result refusals"
3644        );
3645
3646        thread.read_with(cx, |thread, _| {
3647            let entries = thread.entries();
3648            assert!(entries.len() >= 2, "Should have user message and tool call");
3649
3650            // Verify user message is still there
3651            assert!(
3652                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3653                "User message should not be truncated"
3654            );
3655
3656            // Verify tool call is there with result
3657            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3658                assert!(
3659                    tool_call.raw_output.is_some(),
3660                    "Tool call should have output"
3661                );
3662            } else {
3663                panic!("Expected tool call at index 1");
3664            }
3665        });
3666    }
3667
3668    #[gpui::test]
3669    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3670        init_test(cx);
3671
3672        let fs = FakeFs::new(cx.executor());
3673        let project = Project::test(fs, None, cx).await;
3674
3675        let refuse_next = Arc::new(AtomicBool::new(false));
3676        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3677            let refuse_next = refuse_next.clone();
3678            move |_request, _thread, _cx| {
3679                if refuse_next.load(SeqCst) {
3680                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3681                        .boxed_local()
3682                } else {
3683                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3684                        .boxed_local()
3685                }
3686            }
3687        }));
3688
3689        let thread = cx
3690            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3691            .await
3692            .unwrap();
3693
3694        // Track if we see a Refusal event
3695        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3696        let saw_refusal_event_captured = saw_refusal_event.clone();
3697        thread.update(cx, |_thread, cx| {
3698            cx.subscribe(
3699                &thread,
3700                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3701                    if matches!(event, AcpThreadEvent::Refusal) {
3702                        *saw_refusal_event_captured.lock().unwrap() = true;
3703                    }
3704                },
3705            )
3706            .detach();
3707        });
3708
3709        // Send a message that will be refused
3710        refuse_next.store(true, SeqCst);
3711        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3712            .await
3713            .unwrap();
3714
3715        // Verify that a Refusal event WAS emitted for user prompt refusal
3716        assert!(
3717            *saw_refusal_event.lock().unwrap(),
3718            "Refusal event should be emitted for user prompt refusals"
3719        );
3720
3721        // Verify the message was truncated (user prompt refusal)
3722        thread.read_with(cx, |thread, cx| {
3723            assert_eq!(thread.to_markdown(cx), "");
3724        });
3725    }
3726
3727    #[gpui::test]
3728    async fn test_refusal(cx: &mut TestAppContext) {
3729        init_test(cx);
3730        let fs = FakeFs::new(cx.background_executor.clone());
3731        fs.insert_tree(path!("/"), json!({})).await;
3732        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3733
3734        let refuse_next = Arc::new(AtomicBool::new(false));
3735        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3736            let refuse_next = refuse_next.clone();
3737            move |request, thread, mut cx| {
3738                let refuse_next = refuse_next.clone();
3739                async move {
3740                    if refuse_next.load(SeqCst) {
3741                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3742                    }
3743
3744                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3745                        panic!("expected text content block");
3746                    };
3747                    thread.update(&mut cx, |thread, cx| {
3748                        thread
3749                            .handle_session_update(
3750                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3751                                    content.text.to_uppercase().into(),
3752                                )),
3753                                cx,
3754                            )
3755                            .unwrap();
3756                    })?;
3757                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3758                }
3759                .boxed_local()
3760            }
3761        }));
3762        let thread = cx
3763            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3764            .await
3765            .unwrap();
3766
3767        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3768            .await
3769            .unwrap();
3770        thread.read_with(cx, |thread, cx| {
3771            assert_eq!(
3772                thread.to_markdown(cx),
3773                indoc! {"
3774                    ## User
3775
3776                    hello
3777
3778                    ## Assistant
3779
3780                    HELLO
3781
3782                "}
3783            );
3784        });
3785
3786        // Simulate refusing the second message. The message should be truncated
3787        // when a user prompt is refused.
3788        refuse_next.store(true, SeqCst);
3789        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3790            .await
3791            .unwrap();
3792        thread.read_with(cx, |thread, cx| {
3793            assert_eq!(
3794                thread.to_markdown(cx),
3795                indoc! {"
3796                    ## User
3797
3798                    hello
3799
3800                    ## Assistant
3801
3802                    HELLO
3803
3804                "}
3805            );
3806        });
3807    }
3808
3809    async fn run_until_first_tool_call(
3810        thread: &Entity<AcpThread>,
3811        cx: &mut TestAppContext,
3812    ) -> usize {
3813        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3814
3815        let subscription = cx.update(|cx| {
3816            cx.subscribe(thread, move |thread, _, cx| {
3817                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3818                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3819                        return tx.try_send(ix).unwrap();
3820                    }
3821                }
3822            })
3823        });
3824
3825        select! {
3826            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3827                panic!("Timeout waiting for tool call")
3828            }
3829            ix = rx.next().fuse() => {
3830                drop(subscription);
3831                ix.unwrap()
3832            }
3833        }
3834    }
3835
3836    #[derive(Clone, Default)]
3837    struct FakeAgentConnection {
3838        auth_methods: Vec<acp::AuthMethod>,
3839        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3840        on_user_message: Option<
3841            Rc<
3842                dyn Fn(
3843                        acp::PromptRequest,
3844                        WeakEntity<AcpThread>,
3845                        AsyncApp,
3846                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3847                    + 'static,
3848            >,
3849        >,
3850    }
3851
3852    impl FakeAgentConnection {
3853        fn new() -> Self {
3854            Self {
3855                auth_methods: Vec::new(),
3856                on_user_message: None,
3857                sessions: Arc::default(),
3858            }
3859        }
3860
3861        #[expect(unused)]
3862        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3863            self.auth_methods = auth_methods;
3864            self
3865        }
3866
3867        fn on_user_message(
3868            mut self,
3869            handler: impl Fn(
3870                acp::PromptRequest,
3871                WeakEntity<AcpThread>,
3872                AsyncApp,
3873            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3874            + 'static,
3875        ) -> Self {
3876            self.on_user_message.replace(Rc::new(handler));
3877            self
3878        }
3879    }
3880
3881    impl AgentConnection for FakeAgentConnection {
3882        fn telemetry_id(&self) -> SharedString {
3883            "fake".into()
3884        }
3885
3886        fn auth_methods(&self) -> &[acp::AuthMethod] {
3887            &self.auth_methods
3888        }
3889
3890        fn new_session(
3891            self: Rc<Self>,
3892            project: Entity<Project>,
3893            _cwd: &Path,
3894            cx: &mut App,
3895        ) -> Task<gpui::Result<Entity<AcpThread>>> {
3896            let session_id = acp::SessionId::new(
3897                rand::rng()
3898                    .sample_iter(&distr::Alphanumeric)
3899                    .take(7)
3900                    .map(char::from)
3901                    .collect::<String>(),
3902            );
3903            let action_log = cx.new(|_| ActionLog::new(project.clone()));
3904            let thread = cx.new(|cx| {
3905                AcpThread::new(
3906                    None,
3907                    "Test",
3908                    self.clone(),
3909                    project,
3910                    action_log,
3911                    session_id.clone(),
3912                    watch::Receiver::constant(
3913                        acp::PromptCapabilities::new()
3914                            .image(true)
3915                            .audio(true)
3916                            .embedded_context(true),
3917                    ),
3918                    cx,
3919                )
3920            });
3921            self.sessions.lock().insert(session_id, thread.downgrade());
3922            Task::ready(Ok(thread))
3923        }
3924
3925        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3926            if self.auth_methods().iter().any(|m| m.id == method) {
3927                Task::ready(Ok(()))
3928            } else {
3929                Task::ready(Err(anyhow!("Invalid Auth Method")))
3930            }
3931        }
3932
3933        fn prompt(
3934            &self,
3935            _id: Option<UserMessageId>,
3936            params: acp::PromptRequest,
3937            cx: &mut App,
3938        ) -> Task<gpui::Result<acp::PromptResponse>> {
3939            let sessions = self.sessions.lock();
3940            let thread = sessions.get(&params.session_id).unwrap();
3941            if let Some(handler) = &self.on_user_message {
3942                let handler = handler.clone();
3943                let thread = thread.clone();
3944                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3945            } else {
3946                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3947            }
3948        }
3949
3950        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3951            let sessions = self.sessions.lock();
3952            let thread = sessions.get(session_id).unwrap().clone();
3953
3954            cx.spawn(async move |cx| {
3955                thread
3956                    .update(cx, |thread, cx| thread.cancel(cx))
3957                    .unwrap()
3958                    .await
3959            })
3960            .detach();
3961        }
3962
3963        fn truncate(
3964            &self,
3965            session_id: &acp::SessionId,
3966            _cx: &App,
3967        ) -> Option<Rc<dyn AgentSessionTruncate>> {
3968            Some(Rc::new(FakeAgentSessionEditor {
3969                _session_id: session_id.clone(),
3970            }))
3971        }
3972
3973        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3974            self
3975        }
3976    }
3977
3978    struct FakeAgentSessionEditor {
3979        _session_id: acp::SessionId,
3980    }
3981
3982    impl AgentSessionTruncate for FakeAgentSessionEditor {
3983        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3984            Task::ready(Ok(()))
3985        }
3986    }
3987
3988    #[gpui::test]
3989    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3990        init_test(cx);
3991
3992        let fs = FakeFs::new(cx.executor());
3993        let project = Project::test(fs, [], cx).await;
3994        let connection = Rc::new(FakeAgentConnection::new());
3995        let thread = cx
3996            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3997            .await
3998            .unwrap();
3999
4000        // Try to update a tool call that doesn't exist
4001        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4002        thread.update(cx, |thread, cx| {
4003            let result = thread.handle_session_update(
4004                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4005                    nonexistent_id.clone(),
4006                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4007                )),
4008                cx,
4009            );
4010
4011            // The update should succeed (not return an error)
4012            assert!(result.is_ok());
4013
4014            // There should now be exactly one entry in the thread
4015            assert_eq!(thread.entries.len(), 1);
4016
4017            // The entry should be a failed tool call
4018            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4019                assert_eq!(tool_call.id, nonexistent_id);
4020                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4021                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4022
4023                // Check that the content contains the error message
4024                assert_eq!(tool_call.content.len(), 1);
4025                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4026                    match content_block {
4027                        ContentBlock::Markdown { markdown } => {
4028                            let markdown_text = markdown.read(cx).source();
4029                            assert!(markdown_text.contains("Tool call not found"));
4030                        }
4031                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4032                        ContentBlock::ResourceLink { .. } => {
4033                            panic!("Expected markdown content, got resource link")
4034                        }
4035                        ContentBlock::Image { .. } => {
4036                            panic!("Expected markdown content, got image")
4037                        }
4038                    }
4039                } else {
4040                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4041                }
4042            } else {
4043                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4044            }
4045        });
4046    }
4047
4048    /// Tests that restoring a checkpoint properly cleans up terminals that were
4049    /// created after that checkpoint, and cancels any in-progress generation.
4050    ///
4051    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4052    /// that were started after that checkpoint should be terminated, and any in-progress
4053    /// AI generation should be canceled.
4054    #[gpui::test]
4055    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4056        init_test(cx);
4057
4058        let fs = FakeFs::new(cx.executor());
4059        let project = Project::test(fs, [], cx).await;
4060        let connection = Rc::new(FakeAgentConnection::new());
4061        let thread = cx
4062            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4063            .await
4064            .unwrap();
4065
4066        // Send first user message to create a checkpoint
4067        cx.update(|cx| {
4068            thread.update(cx, |thread, cx| {
4069                thread.send(vec!["first message".into()], cx)
4070            })
4071        })
4072        .await
4073        .unwrap();
4074
4075        // Send second message (creates another checkpoint) - we'll restore to this one
4076        cx.update(|cx| {
4077            thread.update(cx, |thread, cx| {
4078                thread.send(vec!["second message".into()], cx)
4079            })
4080        })
4081        .await
4082        .unwrap();
4083
4084        // Create 2 terminals BEFORE the checkpoint that have completed running
4085        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4086        let mock_terminal_1 = cx.new(|cx| {
4087            let builder = ::terminal::TerminalBuilder::new_display_only(
4088                ::terminal::terminal_settings::CursorShape::default(),
4089                ::terminal::terminal_settings::AlternateScroll::On,
4090                None,
4091                0,
4092                cx.background_executor(),
4093                PathStyle::local(),
4094            )
4095            .unwrap();
4096            builder.subscribe(cx)
4097        });
4098
4099        thread.update(cx, |thread, cx| {
4100            thread.on_terminal_provider_event(
4101                TerminalProviderEvent::Created {
4102                    terminal_id: terminal_id_1.clone(),
4103                    label: "echo 'first'".to_string(),
4104                    cwd: Some(PathBuf::from("/test")),
4105                    output_byte_limit: None,
4106                    terminal: mock_terminal_1.clone(),
4107                },
4108                cx,
4109            );
4110        });
4111
4112        thread.update(cx, |thread, cx| {
4113            thread.on_terminal_provider_event(
4114                TerminalProviderEvent::Output {
4115                    terminal_id: terminal_id_1.clone(),
4116                    data: b"first\n".to_vec(),
4117                },
4118                cx,
4119            );
4120        });
4121
4122        thread.update(cx, |thread, cx| {
4123            thread.on_terminal_provider_event(
4124                TerminalProviderEvent::Exit {
4125                    terminal_id: terminal_id_1.clone(),
4126                    status: acp::TerminalExitStatus::new().exit_code(0),
4127                },
4128                cx,
4129            );
4130        });
4131
4132        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4133        let mock_terminal_2 = cx.new(|cx| {
4134            let builder = ::terminal::TerminalBuilder::new_display_only(
4135                ::terminal::terminal_settings::CursorShape::default(),
4136                ::terminal::terminal_settings::AlternateScroll::On,
4137                None,
4138                0,
4139                cx.background_executor(),
4140                PathStyle::local(),
4141            )
4142            .unwrap();
4143            builder.subscribe(cx)
4144        });
4145
4146        thread.update(cx, |thread, cx| {
4147            thread.on_terminal_provider_event(
4148                TerminalProviderEvent::Created {
4149                    terminal_id: terminal_id_2.clone(),
4150                    label: "echo 'second'".to_string(),
4151                    cwd: Some(PathBuf::from("/test")),
4152                    output_byte_limit: None,
4153                    terminal: mock_terminal_2.clone(),
4154                },
4155                cx,
4156            );
4157        });
4158
4159        thread.update(cx, |thread, cx| {
4160            thread.on_terminal_provider_event(
4161                TerminalProviderEvent::Output {
4162                    terminal_id: terminal_id_2.clone(),
4163                    data: b"second\n".to_vec(),
4164                },
4165                cx,
4166            );
4167        });
4168
4169        thread.update(cx, |thread, cx| {
4170            thread.on_terminal_provider_event(
4171                TerminalProviderEvent::Exit {
4172                    terminal_id: terminal_id_2.clone(),
4173                    status: acp::TerminalExitStatus::new().exit_code(0),
4174                },
4175                cx,
4176            );
4177        });
4178
4179        // Get the second message ID to restore to
4180        let second_message_id = thread.read_with(cx, |thread, _| {
4181            // At this point we have:
4182            // - Index 0: First user message (with checkpoint)
4183            // - Index 1: Second user message (with checkpoint)
4184            // No assistant responses because FakeAgentConnection just returns EndTurn
4185            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4186                panic!("expected user message at index 1");
4187            };
4188            message.id.clone().unwrap()
4189        });
4190
4191        // Create a terminal AFTER the checkpoint we'll restore to.
4192        // This simulates the AI agent starting a long-running terminal command.
4193        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4194        let mock_terminal = cx.new(|cx| {
4195            let builder = ::terminal::TerminalBuilder::new_display_only(
4196                ::terminal::terminal_settings::CursorShape::default(),
4197                ::terminal::terminal_settings::AlternateScroll::On,
4198                None,
4199                0,
4200                cx.background_executor(),
4201                PathStyle::local(),
4202            )
4203            .unwrap();
4204            builder.subscribe(cx)
4205        });
4206
4207        // Register the terminal as created
4208        thread.update(cx, |thread, cx| {
4209            thread.on_terminal_provider_event(
4210                TerminalProviderEvent::Created {
4211                    terminal_id: terminal_id.clone(),
4212                    label: "sleep 1000".to_string(),
4213                    cwd: Some(PathBuf::from("/test")),
4214                    output_byte_limit: None,
4215                    terminal: mock_terminal.clone(),
4216                },
4217                cx,
4218            );
4219        });
4220
4221        // Simulate the terminal producing output (still running)
4222        thread.update(cx, |thread, cx| {
4223            thread.on_terminal_provider_event(
4224                TerminalProviderEvent::Output {
4225                    terminal_id: terminal_id.clone(),
4226                    data: b"terminal is running...\n".to_vec(),
4227                },
4228                cx,
4229            );
4230        });
4231
4232        // Create a tool call entry that references this terminal
4233        // This represents the agent requesting a terminal command
4234        thread.update(cx, |thread, cx| {
4235            thread
4236                .handle_session_update(
4237                    acp::SessionUpdate::ToolCall(
4238                        acp::ToolCall::new("terminal-tool-1", "Running command")
4239                            .kind(acp::ToolKind::Execute)
4240                            .status(acp::ToolCallStatus::InProgress)
4241                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4242                                terminal_id.clone(),
4243                            ))])
4244                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4245                    ),
4246                    cx,
4247                )
4248                .unwrap();
4249        });
4250
4251        // Verify terminal exists and is in the thread
4252        let terminal_exists_before =
4253            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4254        assert!(
4255            terminal_exists_before,
4256            "Terminal should exist before checkpoint restore"
4257        );
4258
4259        // Verify the terminal's underlying task is still running (not completed)
4260        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4261            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4262            terminal_entity.read_with(cx, |term, _cx| {
4263                term.output().is_none() // output is None means it's still running
4264            })
4265        });
4266        assert!(
4267            terminal_running_before,
4268            "Terminal should be running before checkpoint restore"
4269        );
4270
4271        // Verify we have the expected entries before restore
4272        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4273        assert!(
4274            entry_count_before > 1,
4275            "Should have multiple entries before restore"
4276        );
4277
4278        // Restore the checkpoint to the second message.
4279        // This should:
4280        // 1. Cancel any in-progress generation (via the cancel() call)
4281        // 2. Remove the terminal that was created after that point
4282        thread
4283            .update(cx, |thread, cx| {
4284                thread.restore_checkpoint(second_message_id, cx)
4285            })
4286            .await
4287            .unwrap();
4288
4289        // Verify that no send_task is in progress after restore
4290        // (cancel() clears the send_task)
4291        let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4292        assert!(
4293            !has_send_task_after,
4294            "Should not have a send_task after restore (cancel should have cleared it)"
4295        );
4296
4297        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4298        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4299        assert_eq!(
4300            entry_count, 1,
4301            "Should have 1 entry after restore (only the first user message)"
4302        );
4303
4304        // Verify the 2 completed terminals from before the checkpoint still exist
4305        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4306            thread.terminals.contains_key(&terminal_id_1)
4307        });
4308        assert!(
4309            terminal_1_exists,
4310            "Terminal 1 (from before checkpoint) should still exist"
4311        );
4312
4313        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4314            thread.terminals.contains_key(&terminal_id_2)
4315        });
4316        assert!(
4317            terminal_2_exists,
4318            "Terminal 2 (from before checkpoint) should still exist"
4319        );
4320
4321        // Verify they're still in completed state
4322        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4323            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4324            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4325        });
4326        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4327
4328        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4329            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4330            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4331        });
4332        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4333
4334        // Verify the running terminal (created after checkpoint) was removed
4335        let terminal_3_exists =
4336            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4337        assert!(
4338            !terminal_3_exists,
4339            "Terminal 3 (created after checkpoint) should have been removed"
4340        );
4341
4342        // Verify total count is 2 (the two from before the checkpoint)
4343        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4344        assert_eq!(
4345            terminal_count, 2,
4346            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4347        );
4348    }
4349
4350    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4351    /// even when a new user message is added while the async checkpoint comparison is in progress.
4352    ///
4353    /// This is a regression test for a bug where update_last_checkpoint would fail with
4354    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4355    /// update_last_checkpoint started and when its async closure ran.
4356    #[gpui::test]
4357    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4358        init_test(cx);
4359
4360        let fs = FakeFs::new(cx.executor());
4361        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4362            .await;
4363        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4364
4365        let handler_done = Arc::new(AtomicBool::new(false));
4366        let handler_done_clone = handler_done.clone();
4367        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4368            move |_, _thread, _cx| {
4369                handler_done_clone.store(true, SeqCst);
4370                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4371            },
4372        ));
4373
4374        let thread = cx
4375            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4376            .await
4377            .unwrap();
4378
4379        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4380        let send_task = cx.background_executor.spawn(send_future);
4381
4382        // Tick until handler completes, then a few more to let update_last_checkpoint start
4383        while !handler_done.load(SeqCst) {
4384            cx.executor().tick();
4385        }
4386        for _ in 0..5 {
4387            cx.executor().tick();
4388        }
4389
4390        thread.update(cx, |thread, cx| {
4391            thread.push_entry(
4392                AgentThreadEntry::UserMessage(UserMessage {
4393                    id: Some(UserMessageId::new()),
4394                    content: ContentBlock::Empty,
4395                    chunks: vec!["Injected message (no checkpoint)".into()],
4396                    checkpoint: None,
4397                    indented: false,
4398                }),
4399                cx,
4400            );
4401        });
4402
4403        cx.run_until_parked();
4404        let result = send_task.await;
4405
4406        assert!(
4407            result.is_ok(),
4408            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4409            result.err()
4410        );
4411    }
4412}