acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use collections::HashSet;
   7pub use connection::*;
   8pub use diff::*;
   9use language::language_settings::FormatOnSave;
  10pub use mention::*;
  11use project::lsp_store::{FormatTrigger, LspFormatTarget};
  12use serde::{Deserialize, Serialize};
  13pub use terminal::*;
  14
  15use action_log::ActionLog;
  16use agent_client_protocol as acp;
  17use anyhow::{Context as _, Result, anyhow};
  18use editor::Bias;
  19use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  20use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  21use itertools::Itertools;
  22use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  23use markdown::Markdown;
  24use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  25use std::collections::HashMap;
  26use std::error::Error;
  27use std::fmt::{Formatter, Write};
  28use std::ops::Range;
  29use std::process::ExitStatus;
  30use std::rc::Rc;
  31use std::time::{Duration, Instant};
  32use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  33use ui::App;
  34use util::ResultExt;
  35
  36#[derive(Debug)]
  37pub struct UserMessage {
  38    pub id: Option<UserMessageId>,
  39    pub content: ContentBlock,
  40    pub chunks: Vec<acp::ContentBlock>,
  41    pub checkpoint: Option<Checkpoint>,
  42}
  43
  44#[derive(Debug)]
  45pub struct Checkpoint {
  46    git_checkpoint: GitStoreCheckpoint,
  47    pub show: bool,
  48}
  49
  50impl UserMessage {
  51    fn to_markdown(&self, cx: &App) -> String {
  52        let mut markdown = String::new();
  53        if self
  54            .checkpoint
  55            .as_ref()
  56            .is_some_and(|checkpoint| checkpoint.show)
  57        {
  58            writeln!(markdown, "## User (checkpoint)").unwrap();
  59        } else {
  60            writeln!(markdown, "## User").unwrap();
  61        }
  62        writeln!(markdown).unwrap();
  63        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  64        writeln!(markdown).unwrap();
  65        markdown
  66    }
  67}
  68
  69#[derive(Debug, PartialEq)]
  70pub struct AssistantMessage {
  71    pub chunks: Vec<AssistantMessageChunk>,
  72}
  73
  74impl AssistantMessage {
  75    pub fn to_markdown(&self, cx: &App) -> String {
  76        format!(
  77            "## Assistant\n\n{}\n\n",
  78            self.chunks
  79                .iter()
  80                .map(|chunk| chunk.to_markdown(cx))
  81                .join("\n\n")
  82        )
  83    }
  84}
  85
  86#[derive(Debug, PartialEq)]
  87pub enum AssistantMessageChunk {
  88    Message { block: ContentBlock },
  89    Thought { block: ContentBlock },
  90}
  91
  92impl AssistantMessageChunk {
  93    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  94        Self::Message {
  95            block: ContentBlock::new(chunk.into(), language_registry, cx),
  96        }
  97    }
  98
  99    fn to_markdown(&self, cx: &App) -> String {
 100        match self {
 101            Self::Message { block } => block.to_markdown(cx).to_string(),
 102            Self::Thought { block } => {
 103                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 104            }
 105        }
 106    }
 107}
 108
 109#[derive(Debug)]
 110pub enum AgentThreadEntry {
 111    UserMessage(UserMessage),
 112    AssistantMessage(AssistantMessage),
 113    ToolCall(ToolCall),
 114}
 115
 116impl AgentThreadEntry {
 117    pub fn to_markdown(&self, cx: &App) -> String {
 118        match self {
 119            Self::UserMessage(message) => message.to_markdown(cx),
 120            Self::AssistantMessage(message) => message.to_markdown(cx),
 121            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 122        }
 123    }
 124
 125    pub fn user_message(&self) -> Option<&UserMessage> {
 126        if let AgentThreadEntry::UserMessage(message) = self {
 127            Some(message)
 128        } else {
 129            None
 130        }
 131    }
 132
 133    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 134        if let AgentThreadEntry::ToolCall(call) = self {
 135            itertools::Either::Left(call.diffs())
 136        } else {
 137            itertools::Either::Right(std::iter::empty())
 138        }
 139    }
 140
 141    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 142        if let AgentThreadEntry::ToolCall(call) = self {
 143            itertools::Either::Left(call.terminals())
 144        } else {
 145            itertools::Either::Right(std::iter::empty())
 146        }
 147    }
 148
 149    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 150        if let AgentThreadEntry::ToolCall(ToolCall {
 151            locations,
 152            resolved_locations,
 153            ..
 154        }) = self
 155        {
 156            Some((
 157                locations.get(ix)?.clone(),
 158                resolved_locations.get(ix)?.clone()?,
 159            ))
 160        } else {
 161            None
 162        }
 163    }
 164}
 165
 166#[derive(Debug)]
 167pub struct ToolCall {
 168    pub id: acp::ToolCallId,
 169    pub label: Entity<Markdown>,
 170    pub kind: acp::ToolKind,
 171    pub content: Vec<ToolCallContent>,
 172    pub status: ToolCallStatus,
 173    pub locations: Vec<acp::ToolCallLocation>,
 174    pub resolved_locations: Vec<Option<AgentLocation>>,
 175    pub raw_input: Option<serde_json::Value>,
 176    pub raw_output: Option<serde_json::Value>,
 177}
 178
 179impl ToolCall {
 180    fn from_acp(
 181        tool_call: acp::ToolCall,
 182        status: ToolCallStatus,
 183        language_registry: Arc<LanguageRegistry>,
 184        cx: &mut App,
 185    ) -> Self {
 186        Self {
 187            id: tool_call.id,
 188            label: cx.new(|cx| {
 189                Markdown::new(
 190                    tool_call.title.into(),
 191                    Some(language_registry.clone()),
 192                    None,
 193                    cx,
 194                )
 195            }),
 196            kind: tool_call.kind,
 197            content: tool_call
 198                .content
 199                .into_iter()
 200                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 201                .collect(),
 202            locations: tool_call.locations,
 203            resolved_locations: Vec::default(),
 204            status,
 205            raw_input: tool_call.raw_input,
 206            raw_output: tool_call.raw_output,
 207        }
 208    }
 209
 210    fn update_fields(
 211        &mut self,
 212        fields: acp::ToolCallUpdateFields,
 213        language_registry: Arc<LanguageRegistry>,
 214        cx: &mut App,
 215    ) {
 216        let acp::ToolCallUpdateFields {
 217            kind,
 218            status,
 219            title,
 220            content,
 221            locations,
 222            raw_input,
 223            raw_output,
 224        } = fields;
 225
 226        if let Some(kind) = kind {
 227            self.kind = kind;
 228        }
 229
 230        if let Some(status) = status {
 231            self.status = status.into();
 232        }
 233
 234        if let Some(title) = title {
 235            self.label.update(cx, |label, cx| {
 236                label.replace(title, cx);
 237            });
 238        }
 239
 240        if let Some(content) = content {
 241            self.content = content
 242                .into_iter()
 243                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 244                .collect();
 245        }
 246
 247        if let Some(locations) = locations {
 248            self.locations = locations;
 249        }
 250
 251        if let Some(raw_input) = raw_input {
 252            self.raw_input = Some(raw_input);
 253        }
 254
 255        if let Some(raw_output) = raw_output {
 256            if self.content.is_empty()
 257                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 258            {
 259                self.content
 260                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 261                        markdown,
 262                    }));
 263            }
 264            self.raw_output = Some(raw_output);
 265        }
 266    }
 267
 268    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 269        self.content.iter().filter_map(|content| match content {
 270            ToolCallContent::Diff(diff) => Some(diff),
 271            ToolCallContent::ContentBlock(_) => None,
 272            ToolCallContent::Terminal(_) => None,
 273        })
 274    }
 275
 276    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 277        self.content.iter().filter_map(|content| match content {
 278            ToolCallContent::Terminal(terminal) => Some(terminal),
 279            ToolCallContent::ContentBlock(_) => None,
 280            ToolCallContent::Diff(_) => None,
 281        })
 282    }
 283
 284    fn to_markdown(&self, cx: &App) -> String {
 285        let mut markdown = format!(
 286            "**Tool Call: {}**\nStatus: {}\n\n",
 287            self.label.read(cx).source(),
 288            self.status
 289        );
 290        for content in &self.content {
 291            markdown.push_str(content.to_markdown(cx).as_str());
 292            markdown.push_str("\n\n");
 293        }
 294        markdown
 295    }
 296
 297    async fn resolve_location(
 298        location: acp::ToolCallLocation,
 299        project: WeakEntity<Project>,
 300        cx: &mut AsyncApp,
 301    ) -> Option<AgentLocation> {
 302        let buffer = project
 303            .update(cx, |project, cx| {
 304                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 305                    Some(project.open_buffer(path, cx))
 306                } else {
 307                    None
 308                }
 309            })
 310            .ok()??;
 311        let buffer = buffer.await.log_err()?;
 312        let position = buffer
 313            .update(cx, |buffer, _| {
 314                if let Some(row) = location.line {
 315                    let snapshot = buffer.snapshot();
 316                    let column = snapshot.indent_size_for_line(row).len;
 317                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 318                    snapshot.anchor_before(point)
 319                } else {
 320                    Anchor::MIN
 321                }
 322            })
 323            .ok()?;
 324
 325        Some(AgentLocation {
 326            buffer: buffer.downgrade(),
 327            position,
 328        })
 329    }
 330
 331    fn resolve_locations(
 332        &self,
 333        project: Entity<Project>,
 334        cx: &mut App,
 335    ) -> Task<Vec<Option<AgentLocation>>> {
 336        let locations = self.locations.clone();
 337        project.update(cx, |_, cx| {
 338            cx.spawn(async move |project, cx| {
 339                let mut new_locations = Vec::new();
 340                for location in locations {
 341                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 342                }
 343                new_locations
 344            })
 345        })
 346    }
 347}
 348
 349#[derive(Debug)]
 350pub enum ToolCallStatus {
 351    /// The tool call hasn't started running yet, but we start showing it to
 352    /// the user.
 353    Pending,
 354    /// The tool call is waiting for confirmation from the user.
 355    WaitingForConfirmation {
 356        options: Vec<acp::PermissionOption>,
 357        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 358    },
 359    /// The tool call is currently running.
 360    InProgress,
 361    /// The tool call completed successfully.
 362    Completed,
 363    /// The tool call failed.
 364    Failed,
 365    /// The user rejected the tool call.
 366    Rejected,
 367    /// The user canceled generation so the tool call was canceled.
 368    Canceled,
 369}
 370
 371impl From<acp::ToolCallStatus> for ToolCallStatus {
 372    fn from(status: acp::ToolCallStatus) -> Self {
 373        match status {
 374            acp::ToolCallStatus::Pending => Self::Pending,
 375            acp::ToolCallStatus::InProgress => Self::InProgress,
 376            acp::ToolCallStatus::Completed => Self::Completed,
 377            acp::ToolCallStatus::Failed => Self::Failed,
 378        }
 379    }
 380}
 381
 382impl Display for ToolCallStatus {
 383    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 384        write!(
 385            f,
 386            "{}",
 387            match self {
 388                ToolCallStatus::Pending => "Pending",
 389                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 390                ToolCallStatus::InProgress => "In Progress",
 391                ToolCallStatus::Completed => "Completed",
 392                ToolCallStatus::Failed => "Failed",
 393                ToolCallStatus::Rejected => "Rejected",
 394                ToolCallStatus::Canceled => "Canceled",
 395            }
 396        )
 397    }
 398}
 399
 400#[derive(Debug, PartialEq, Clone)]
 401pub enum ContentBlock {
 402    Empty,
 403    Markdown { markdown: Entity<Markdown> },
 404    ResourceLink { resource_link: acp::ResourceLink },
 405}
 406
 407impl ContentBlock {
 408    pub fn new(
 409        block: acp::ContentBlock,
 410        language_registry: &Arc<LanguageRegistry>,
 411        cx: &mut App,
 412    ) -> Self {
 413        let mut this = Self::Empty;
 414        this.append(block, language_registry, cx);
 415        this
 416    }
 417
 418    pub fn new_combined(
 419        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 420        language_registry: Arc<LanguageRegistry>,
 421        cx: &mut App,
 422    ) -> Self {
 423        let mut this = Self::Empty;
 424        for block in blocks {
 425            this.append(block, &language_registry, cx);
 426        }
 427        this
 428    }
 429
 430    pub fn append(
 431        &mut self,
 432        block: acp::ContentBlock,
 433        language_registry: &Arc<LanguageRegistry>,
 434        cx: &mut App,
 435    ) {
 436        if matches!(self, ContentBlock::Empty)
 437            && let acp::ContentBlock::ResourceLink(resource_link) = block
 438        {
 439            *self = ContentBlock::ResourceLink { resource_link };
 440            return;
 441        }
 442
 443        let new_content = self.block_string_contents(block);
 444
 445        match self {
 446            ContentBlock::Empty => {
 447                *self = Self::create_markdown_block(new_content, language_registry, cx);
 448            }
 449            ContentBlock::Markdown { markdown } => {
 450                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 451            }
 452            ContentBlock::ResourceLink { resource_link } => {
 453                let existing_content = Self::resource_link_md(&resource_link.uri);
 454                let combined = format!("{}\n{}", existing_content, new_content);
 455
 456                *self = Self::create_markdown_block(combined, language_registry, cx);
 457            }
 458        }
 459    }
 460
 461    fn create_markdown_block(
 462        content: String,
 463        language_registry: &Arc<LanguageRegistry>,
 464        cx: &mut App,
 465    ) -> ContentBlock {
 466        ContentBlock::Markdown {
 467            markdown: cx
 468                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 469        }
 470    }
 471
 472    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 473        match block {
 474            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 475            acp::ContentBlock::ResourceLink(resource_link) => {
 476                Self::resource_link_md(&resource_link.uri)
 477            }
 478            acp::ContentBlock::Resource(acp::EmbeddedResource {
 479                resource:
 480                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 481                        uri,
 482                        ..
 483                    }),
 484                ..
 485            }) => Self::resource_link_md(&uri),
 486            acp::ContentBlock::Image(image) => Self::image_md(&image),
 487            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 488        }
 489    }
 490
 491    fn resource_link_md(uri: &str) -> String {
 492        if let Some(uri) = MentionUri::parse(uri).log_err() {
 493            uri.as_link().to_string()
 494        } else {
 495            uri.to_string()
 496        }
 497    }
 498
 499    fn image_md(_image: &acp::ImageContent) -> String {
 500        "`Image`".into()
 501    }
 502
 503    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 504        match self {
 505            ContentBlock::Empty => "",
 506            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 507            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 508        }
 509    }
 510
 511    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 512        match self {
 513            ContentBlock::Empty => None,
 514            ContentBlock::Markdown { markdown } => Some(markdown),
 515            ContentBlock::ResourceLink { .. } => None,
 516        }
 517    }
 518
 519    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 520        match self {
 521            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 522            _ => None,
 523        }
 524    }
 525}
 526
 527#[derive(Debug)]
 528pub enum ToolCallContent {
 529    ContentBlock(ContentBlock),
 530    Diff(Entity<Diff>),
 531    Terminal(Entity<Terminal>),
 532}
 533
 534impl ToolCallContent {
 535    pub fn from_acp(
 536        content: acp::ToolCallContent,
 537        language_registry: Arc<LanguageRegistry>,
 538        cx: &mut App,
 539    ) -> Self {
 540        match content {
 541            acp::ToolCallContent::Content { content } => {
 542                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 543            }
 544            acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
 545                Diff::finalized(
 546                    diff.path,
 547                    diff.old_text,
 548                    diff.new_text,
 549                    language_registry,
 550                    cx,
 551                )
 552            })),
 553        }
 554    }
 555
 556    pub fn to_markdown(&self, cx: &App) -> String {
 557        match self {
 558            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 559            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 560            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 561        }
 562    }
 563}
 564
 565#[derive(Debug, PartialEq)]
 566pub enum ToolCallUpdate {
 567    UpdateFields(acp::ToolCallUpdate),
 568    UpdateDiff(ToolCallUpdateDiff),
 569    UpdateTerminal(ToolCallUpdateTerminal),
 570}
 571
 572impl ToolCallUpdate {
 573    fn id(&self) -> &acp::ToolCallId {
 574        match self {
 575            Self::UpdateFields(update) => &update.id,
 576            Self::UpdateDiff(diff) => &diff.id,
 577            Self::UpdateTerminal(terminal) => &terminal.id,
 578        }
 579    }
 580}
 581
 582impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 583    fn from(update: acp::ToolCallUpdate) -> Self {
 584        Self::UpdateFields(update)
 585    }
 586}
 587
 588impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 589    fn from(diff: ToolCallUpdateDiff) -> Self {
 590        Self::UpdateDiff(diff)
 591    }
 592}
 593
 594#[derive(Debug, PartialEq)]
 595pub struct ToolCallUpdateDiff {
 596    pub id: acp::ToolCallId,
 597    pub diff: Entity<Diff>,
 598}
 599
 600impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 601    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 602        Self::UpdateTerminal(terminal)
 603    }
 604}
 605
 606#[derive(Debug, PartialEq)]
 607pub struct ToolCallUpdateTerminal {
 608    pub id: acp::ToolCallId,
 609    pub terminal: Entity<Terminal>,
 610}
 611
 612#[derive(Debug, Default)]
 613pub struct Plan {
 614    pub entries: Vec<PlanEntry>,
 615}
 616
 617#[derive(Debug)]
 618pub struct PlanStats<'a> {
 619    pub in_progress_entry: Option<&'a PlanEntry>,
 620    pub pending: u32,
 621    pub completed: u32,
 622}
 623
 624impl Plan {
 625    pub fn is_empty(&self) -> bool {
 626        self.entries.is_empty()
 627    }
 628
 629    pub fn stats(&self) -> PlanStats<'_> {
 630        let mut stats = PlanStats {
 631            in_progress_entry: None,
 632            pending: 0,
 633            completed: 0,
 634        };
 635
 636        for entry in &self.entries {
 637            match &entry.status {
 638                acp::PlanEntryStatus::Pending => {
 639                    stats.pending += 1;
 640                }
 641                acp::PlanEntryStatus::InProgress => {
 642                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 643                }
 644                acp::PlanEntryStatus::Completed => {
 645                    stats.completed += 1;
 646                }
 647            }
 648        }
 649
 650        stats
 651    }
 652}
 653
 654#[derive(Debug)]
 655pub struct PlanEntry {
 656    pub content: Entity<Markdown>,
 657    pub priority: acp::PlanEntryPriority,
 658    pub status: acp::PlanEntryStatus,
 659}
 660
 661impl PlanEntry {
 662    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 663        Self {
 664            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 665            priority: entry.priority,
 666            status: entry.status,
 667        }
 668    }
 669}
 670
 671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 672pub struct TokenUsage {
 673    pub max_tokens: u64,
 674    pub used_tokens: u64,
 675}
 676
 677#[derive(Debug, Clone)]
 678pub struct RetryStatus {
 679    pub last_error: SharedString,
 680    pub attempt: usize,
 681    pub max_attempts: usize,
 682    pub started_at: Instant,
 683    pub duration: Duration,
 684}
 685
 686pub struct AcpThread {
 687    title: SharedString,
 688    entries: Vec<AgentThreadEntry>,
 689    plan: Plan,
 690    project: Entity<Project>,
 691    action_log: Entity<ActionLog>,
 692    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 693    send_task: Option<Task<()>>,
 694    connection: Rc<dyn AgentConnection>,
 695    session_id: acp::SessionId,
 696    token_usage: Option<TokenUsage>,
 697}
 698
 699#[derive(Debug)]
 700pub enum AcpThreadEvent {
 701    NewEntry,
 702    TitleUpdated,
 703    TokenUsageUpdated,
 704    EntryUpdated(usize),
 705    EntriesRemoved(Range<usize>),
 706    ToolAuthorizationRequired,
 707    Retry(RetryStatus),
 708    Stopped,
 709    Error,
 710    LoadError(LoadError),
 711}
 712
 713impl EventEmitter<AcpThreadEvent> for AcpThread {}
 714
 715#[derive(PartialEq, Eq)]
 716pub enum ThreadStatus {
 717    Idle,
 718    WaitingForToolConfirmation,
 719    Generating,
 720}
 721
 722#[derive(Debug, Clone)]
 723pub enum LoadError {
 724    NotInstalled {
 725        error_message: SharedString,
 726        install_message: SharedString,
 727        install_command: String,
 728    },
 729    Unsupported {
 730        error_message: SharedString,
 731        upgrade_message: SharedString,
 732        upgrade_command: String,
 733    },
 734    Exited {
 735        status: ExitStatus,
 736    },
 737    Other(SharedString),
 738}
 739
 740impl Display for LoadError {
 741    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 742        match self {
 743            LoadError::NotInstalled { error_message, .. }
 744            | LoadError::Unsupported { error_message, .. } => {
 745                write!(f, "{error_message}")
 746            }
 747            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
 748            LoadError::Other(msg) => write!(f, "{}", msg),
 749        }
 750    }
 751}
 752
 753impl Error for LoadError {}
 754
 755impl AcpThread {
 756    pub fn new(
 757        title: impl Into<SharedString>,
 758        connection: Rc<dyn AgentConnection>,
 759        project: Entity<Project>,
 760        action_log: Entity<ActionLog>,
 761        session_id: acp::SessionId,
 762    ) -> Self {
 763        Self {
 764            action_log,
 765            shared_buffers: Default::default(),
 766            entries: Default::default(),
 767            plan: Default::default(),
 768            title: title.into(),
 769            project,
 770            send_task: None,
 771            connection,
 772            session_id,
 773            token_usage: None,
 774        }
 775    }
 776
 777    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 778        &self.connection
 779    }
 780
 781    pub fn action_log(&self) -> &Entity<ActionLog> {
 782        &self.action_log
 783    }
 784
 785    pub fn project(&self) -> &Entity<Project> {
 786        &self.project
 787    }
 788
 789    pub fn title(&self) -> SharedString {
 790        self.title.clone()
 791    }
 792
 793    pub fn entries(&self) -> &[AgentThreadEntry] {
 794        &self.entries
 795    }
 796
 797    pub fn session_id(&self) -> &acp::SessionId {
 798        &self.session_id
 799    }
 800
 801    pub fn status(&self) -> ThreadStatus {
 802        if self.send_task.is_some() {
 803            if self.waiting_for_tool_confirmation() {
 804                ThreadStatus::WaitingForToolConfirmation
 805            } else {
 806                ThreadStatus::Generating
 807            }
 808        } else {
 809            ThreadStatus::Idle
 810        }
 811    }
 812
 813    pub fn token_usage(&self) -> Option<&TokenUsage> {
 814        self.token_usage.as_ref()
 815    }
 816
 817    pub fn has_pending_edit_tool_calls(&self) -> bool {
 818        for entry in self.entries.iter().rev() {
 819            match entry {
 820                AgentThreadEntry::UserMessage(_) => return false,
 821                AgentThreadEntry::ToolCall(
 822                    call @ ToolCall {
 823                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
 824                        ..
 825                    },
 826                ) if call.diffs().next().is_some() => {
 827                    return true;
 828                }
 829                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 830            }
 831        }
 832
 833        false
 834    }
 835
 836    pub fn used_tools_since_last_user_message(&self) -> bool {
 837        for entry in self.entries.iter().rev() {
 838            match entry {
 839                AgentThreadEntry::UserMessage(..) => return false,
 840                AgentThreadEntry::AssistantMessage(..) => continue,
 841                AgentThreadEntry::ToolCall(..) => return true,
 842            }
 843        }
 844
 845        false
 846    }
 847
 848    pub fn handle_session_update(
 849        &mut self,
 850        update: acp::SessionUpdate,
 851        cx: &mut Context<Self>,
 852    ) -> Result<(), acp::Error> {
 853        match update {
 854            acp::SessionUpdate::UserMessageChunk { content } => {
 855                self.push_user_content_block(None, content, cx);
 856            }
 857            acp::SessionUpdate::AgentMessageChunk { content } => {
 858                self.push_assistant_content_block(content, false, cx);
 859            }
 860            acp::SessionUpdate::AgentThoughtChunk { content } => {
 861                self.push_assistant_content_block(content, true, cx);
 862            }
 863            acp::SessionUpdate::ToolCall(tool_call) => {
 864                self.upsert_tool_call(tool_call, cx)?;
 865            }
 866            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 867                self.update_tool_call(tool_call_update, cx)?;
 868            }
 869            acp::SessionUpdate::Plan(plan) => {
 870                self.update_plan(plan, cx);
 871            }
 872        }
 873        Ok(())
 874    }
 875
 876    pub fn push_user_content_block(
 877        &mut self,
 878        message_id: Option<UserMessageId>,
 879        chunk: acp::ContentBlock,
 880        cx: &mut Context<Self>,
 881    ) {
 882        let language_registry = self.project.read(cx).languages().clone();
 883        let entries_len = self.entries.len();
 884
 885        if let Some(last_entry) = self.entries.last_mut()
 886            && let AgentThreadEntry::UserMessage(UserMessage {
 887                id,
 888                content,
 889                chunks,
 890                ..
 891            }) = last_entry
 892        {
 893            *id = message_id.or(id.take());
 894            content.append(chunk.clone(), &language_registry, cx);
 895            chunks.push(chunk);
 896            let idx = entries_len - 1;
 897            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 898        } else {
 899            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 900            self.push_entry(
 901                AgentThreadEntry::UserMessage(UserMessage {
 902                    id: message_id,
 903                    content,
 904                    chunks: vec![chunk],
 905                    checkpoint: None,
 906                }),
 907                cx,
 908            );
 909        }
 910    }
 911
 912    pub fn push_assistant_content_block(
 913        &mut self,
 914        chunk: acp::ContentBlock,
 915        is_thought: bool,
 916        cx: &mut Context<Self>,
 917    ) {
 918        let language_registry = self.project.read(cx).languages().clone();
 919        let entries_len = self.entries.len();
 920        if let Some(last_entry) = self.entries.last_mut()
 921            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 922        {
 923            let idx = entries_len - 1;
 924            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 925            match (chunks.last_mut(), is_thought) {
 926                (Some(AssistantMessageChunk::Message { block }), false)
 927                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 928                    block.append(chunk, &language_registry, cx)
 929                }
 930                _ => {
 931                    let block = ContentBlock::new(chunk, &language_registry, cx);
 932                    if is_thought {
 933                        chunks.push(AssistantMessageChunk::Thought { block })
 934                    } else {
 935                        chunks.push(AssistantMessageChunk::Message { block })
 936                    }
 937                }
 938            }
 939        } else {
 940            let block = ContentBlock::new(chunk, &language_registry, cx);
 941            let chunk = if is_thought {
 942                AssistantMessageChunk::Thought { block }
 943            } else {
 944                AssistantMessageChunk::Message { block }
 945            };
 946
 947            self.push_entry(
 948                AgentThreadEntry::AssistantMessage(AssistantMessage {
 949                    chunks: vec![chunk],
 950                }),
 951                cx,
 952            );
 953        }
 954    }
 955
 956    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 957        self.entries.push(entry);
 958        cx.emit(AcpThreadEvent::NewEntry);
 959    }
 960
 961    pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
 962        self.title = title;
 963        cx.emit(AcpThreadEvent::TitleUpdated);
 964        Ok(())
 965    }
 966
 967    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
 968        self.token_usage = usage;
 969        cx.emit(AcpThreadEvent::TokenUsageUpdated);
 970    }
 971
 972    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
 973        cx.emit(AcpThreadEvent::Retry(status));
 974    }
 975
 976    pub fn update_tool_call(
 977        &mut self,
 978        update: impl Into<ToolCallUpdate>,
 979        cx: &mut Context<Self>,
 980    ) -> Result<()> {
 981        let update = update.into();
 982        let languages = self.project.read(cx).languages().clone();
 983
 984        let (ix, current_call) = self
 985            .tool_call_mut(update.id())
 986            .context("Tool call not found")?;
 987        match update {
 988            ToolCallUpdate::UpdateFields(update) => {
 989                let location_updated = update.fields.locations.is_some();
 990                current_call.update_fields(update.fields, languages, cx);
 991                if location_updated {
 992                    self.resolve_locations(update.id.clone(), cx);
 993                }
 994            }
 995            ToolCallUpdate::UpdateDiff(update) => {
 996                current_call.content.clear();
 997                current_call
 998                    .content
 999                    .push(ToolCallContent::Diff(update.diff));
1000            }
1001            ToolCallUpdate::UpdateTerminal(update) => {
1002                current_call.content.clear();
1003                current_call
1004                    .content
1005                    .push(ToolCallContent::Terminal(update.terminal));
1006            }
1007        }
1008
1009        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1010
1011        Ok(())
1012    }
1013
1014    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1015    pub fn upsert_tool_call(
1016        &mut self,
1017        tool_call: acp::ToolCall,
1018        cx: &mut Context<Self>,
1019    ) -> Result<(), acp::Error> {
1020        let status = tool_call.status.into();
1021        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1022    }
1023
1024    /// Fails if id does not match an existing entry.
1025    pub fn upsert_tool_call_inner(
1026        &mut self,
1027        tool_call_update: acp::ToolCallUpdate,
1028        status: ToolCallStatus,
1029        cx: &mut Context<Self>,
1030    ) -> Result<(), acp::Error> {
1031        let language_registry = self.project.read(cx).languages().clone();
1032        let id = tool_call_update.id.clone();
1033
1034        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1035            current_call.update_fields(tool_call_update.fields, language_registry, cx);
1036            current_call.status = status;
1037
1038            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1039        } else {
1040            let call =
1041                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1042            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1043        };
1044
1045        self.resolve_locations(id, cx);
1046        Ok(())
1047    }
1048
1049    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1050        // The tool call we are looking for is typically the last one, or very close to the end.
1051        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1052        self.entries
1053            .iter_mut()
1054            .enumerate()
1055            .rev()
1056            .find_map(|(index, tool_call)| {
1057                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1058                    && &tool_call.id == id
1059                {
1060                    Some((index, tool_call))
1061                } else {
1062                    None
1063                }
1064            })
1065    }
1066
1067    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1068        self.entries
1069            .iter()
1070            .enumerate()
1071            .rev()
1072            .find_map(|(index, tool_call)| {
1073                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1074                    && &tool_call.id == id
1075                {
1076                    Some((index, tool_call))
1077                } else {
1078                    None
1079                }
1080            })
1081    }
1082
1083    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1084        let project = self.project.clone();
1085        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1086            return;
1087        };
1088        let task = tool_call.resolve_locations(project, cx);
1089        cx.spawn(async move |this, cx| {
1090            let resolved_locations = task.await;
1091            this.update(cx, |this, cx| {
1092                let project = this.project.clone();
1093                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1094                    return;
1095                };
1096                if let Some(Some(location)) = resolved_locations.last() {
1097                    project.update(cx, |project, cx| {
1098                        if let Some(agent_location) = project.agent_location() {
1099                            let should_ignore = agent_location.buffer == location.buffer
1100                                && location
1101                                    .buffer
1102                                    .update(cx, |buffer, _| {
1103                                        let snapshot = buffer.snapshot();
1104                                        let old_position =
1105                                            agent_location.position.to_point(&snapshot);
1106                                        let new_position = location.position.to_point(&snapshot);
1107                                        // ignore this so that when we get updates from the edit tool
1108                                        // the position doesn't reset to the startof line
1109                                        old_position.row == new_position.row
1110                                            && old_position.column > new_position.column
1111                                    })
1112                                    .ok()
1113                                    .unwrap_or_default();
1114                            if !should_ignore {
1115                                project.set_agent_location(Some(location.clone()), cx);
1116                            }
1117                        }
1118                    });
1119                }
1120                if tool_call.resolved_locations != resolved_locations {
1121                    tool_call.resolved_locations = resolved_locations;
1122                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1123                }
1124            })
1125        })
1126        .detach();
1127    }
1128
1129    pub fn request_tool_call_authorization(
1130        &mut self,
1131        tool_call: acp::ToolCallUpdate,
1132        options: Vec<acp::PermissionOption>,
1133        cx: &mut Context<Self>,
1134    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1135        let (tx, rx) = oneshot::channel();
1136
1137        let status = ToolCallStatus::WaitingForConfirmation {
1138            options,
1139            respond_tx: tx,
1140        };
1141
1142        self.upsert_tool_call_inner(tool_call, status, cx)?;
1143        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1144        Ok(rx)
1145    }
1146
1147    pub fn authorize_tool_call(
1148        &mut self,
1149        id: acp::ToolCallId,
1150        option_id: acp::PermissionOptionId,
1151        option_kind: acp::PermissionOptionKind,
1152        cx: &mut Context<Self>,
1153    ) {
1154        let Some((ix, call)) = self.tool_call_mut(&id) else {
1155            return;
1156        };
1157
1158        let new_status = match option_kind {
1159            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1160                ToolCallStatus::Rejected
1161            }
1162            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1163                ToolCallStatus::InProgress
1164            }
1165        };
1166
1167        let curr_status = mem::replace(&mut call.status, new_status);
1168
1169        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1170            respond_tx.send(option_id).log_err();
1171        } else if cfg!(debug_assertions) {
1172            panic!("tried to authorize an already authorized tool call");
1173        }
1174
1175        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1176    }
1177
1178    /// Returns true if the last turn is awaiting tool authorization
1179    pub fn waiting_for_tool_confirmation(&self) -> bool {
1180        for entry in self.entries.iter().rev() {
1181            match &entry {
1182                AgentThreadEntry::ToolCall(call) => match call.status {
1183                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1184                    ToolCallStatus::Pending
1185                    | ToolCallStatus::InProgress
1186                    | ToolCallStatus::Completed
1187                    | ToolCallStatus::Failed
1188                    | ToolCallStatus::Rejected
1189                    | ToolCallStatus::Canceled => continue,
1190                },
1191                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1192                    // Reached the beginning of the turn
1193                    return false;
1194                }
1195            }
1196        }
1197        false
1198    }
1199
1200    pub fn plan(&self) -> &Plan {
1201        &self.plan
1202    }
1203
1204    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1205        let new_entries_len = request.entries.len();
1206        let mut new_entries = request.entries.into_iter();
1207
1208        // Reuse existing markdown to prevent flickering
1209        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1210            let PlanEntry {
1211                content,
1212                priority,
1213                status,
1214            } = old;
1215            content.update(cx, |old, cx| {
1216                old.replace(new.content, cx);
1217            });
1218            *priority = new.priority;
1219            *status = new.status;
1220        }
1221        for new in new_entries {
1222            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1223        }
1224        self.plan.entries.truncate(new_entries_len);
1225
1226        cx.notify();
1227    }
1228
1229    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1230        self.plan
1231            .entries
1232            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1233        cx.notify();
1234    }
1235
1236    #[cfg(any(test, feature = "test-support"))]
1237    pub fn send_raw(
1238        &mut self,
1239        message: &str,
1240        cx: &mut Context<Self>,
1241    ) -> BoxFuture<'static, Result<()>> {
1242        self.send(
1243            vec![acp::ContentBlock::Text(acp::TextContent {
1244                text: message.to_string(),
1245                annotations: None,
1246            })],
1247            cx,
1248        )
1249    }
1250
1251    pub fn send(
1252        &mut self,
1253        message: Vec<acp::ContentBlock>,
1254        cx: &mut Context<Self>,
1255    ) -> BoxFuture<'static, Result<()>> {
1256        let block = ContentBlock::new_combined(
1257            message.clone(),
1258            self.project.read(cx).languages().clone(),
1259            cx,
1260        );
1261        let request = acp::PromptRequest {
1262            prompt: message.clone(),
1263            session_id: self.session_id.clone(),
1264        };
1265        let git_store = self.project.read(cx).git_store().clone();
1266
1267        let message_id = if self
1268            .connection
1269            .session_editor(&self.session_id, cx)
1270            .is_some()
1271        {
1272            Some(UserMessageId::new())
1273        } else {
1274            None
1275        };
1276
1277        self.run_turn(cx, async move |this, cx| {
1278            this.update(cx, |this, cx| {
1279                this.push_entry(
1280                    AgentThreadEntry::UserMessage(UserMessage {
1281                        id: message_id.clone(),
1282                        content: block,
1283                        chunks: message,
1284                        checkpoint: None,
1285                    }),
1286                    cx,
1287                );
1288            })
1289            .ok();
1290
1291            let old_checkpoint = git_store
1292                .update(cx, |git, cx| git.checkpoint(cx))?
1293                .await
1294                .context("failed to get old checkpoint")
1295                .log_err();
1296            this.update(cx, |this, cx| {
1297                if let Some((_ix, message)) = this.last_user_message() {
1298                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1299                        git_checkpoint,
1300                        show: false,
1301                    });
1302                }
1303                this.connection.prompt(message_id, request, cx)
1304            })?
1305            .await
1306        })
1307    }
1308
1309    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1310        self.run_turn(cx, async move |this, cx| {
1311            this.update(cx, |this, cx| {
1312                this.connection
1313                    .resume(&this.session_id, cx)
1314                    .map(|resume| resume.run(cx))
1315            })?
1316            .context("resuming a session is not supported")?
1317            .await
1318        })
1319    }
1320
1321    fn run_turn(
1322        &mut self,
1323        cx: &mut Context<Self>,
1324        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1325    ) -> BoxFuture<'static, Result<()>> {
1326        self.clear_completed_plan_entries(cx);
1327
1328        let (tx, rx) = oneshot::channel();
1329        let cancel_task = self.cancel(cx);
1330
1331        self.send_task = Some(cx.spawn(async move |this, cx| {
1332            cancel_task.await;
1333            tx.send(f(this, cx).await).ok();
1334        }));
1335
1336        cx.spawn(async move |this, cx| {
1337            let response = rx.await;
1338
1339            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1340                .await?;
1341
1342            this.update(cx, |this, cx| {
1343                this.project
1344                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1345                match response {
1346                    Ok(Err(e)) => {
1347                        this.send_task.take();
1348                        cx.emit(AcpThreadEvent::Error);
1349                        Err(e)
1350                    }
1351                    result => {
1352                        let canceled = matches!(
1353                            result,
1354                            Ok(Ok(acp::PromptResponse {
1355                                stop_reason: acp::StopReason::Canceled
1356                            }))
1357                        );
1358
1359                        // We only take the task if the current prompt wasn't canceled.
1360                        //
1361                        // This prompt may have been canceled because another one was sent
1362                        // while it was still generating. In these cases, dropping `send_task`
1363                        // would cause the next generation to be canceled.
1364                        if !canceled {
1365                            this.send_task.take();
1366                        }
1367
1368                        cx.emit(AcpThreadEvent::Stopped);
1369                        Ok(())
1370                    }
1371                }
1372            })?
1373        })
1374        .boxed()
1375    }
1376
1377    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1378        let Some(send_task) = self.send_task.take() else {
1379            return Task::ready(());
1380        };
1381
1382        for entry in self.entries.iter_mut() {
1383            if let AgentThreadEntry::ToolCall(call) = entry {
1384                let cancel = matches!(
1385                    call.status,
1386                    ToolCallStatus::Pending
1387                        | ToolCallStatus::WaitingForConfirmation { .. }
1388                        | ToolCallStatus::InProgress
1389                );
1390
1391                if cancel {
1392                    call.status = ToolCallStatus::Canceled;
1393                }
1394            }
1395        }
1396
1397        self.connection.cancel(&self.session_id, cx);
1398
1399        // Wait for the send task to complete
1400        cx.foreground_executor().spawn(send_task)
1401    }
1402
1403    /// Rewinds this thread to before the entry at `index`, removing it and all
1404    /// subsequent entries while reverting any changes made from that point.
1405    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1406        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1407            return Task::ready(Err(anyhow!("not supported")));
1408        };
1409        let Some(message) = self.user_message(&id) else {
1410            return Task::ready(Err(anyhow!("message not found")));
1411        };
1412
1413        let checkpoint = message
1414            .checkpoint
1415            .as_ref()
1416            .map(|c| c.git_checkpoint.clone());
1417
1418        let git_store = self.project.read(cx).git_store().clone();
1419        cx.spawn(async move |this, cx| {
1420            if let Some(checkpoint) = checkpoint {
1421                git_store
1422                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1423                    .await?;
1424            }
1425
1426            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1427                .await?;
1428            this.update(cx, |this, cx| {
1429                if let Some((ix, _)) = this.user_message_mut(&id) {
1430                    let range = ix..this.entries.len();
1431                    this.entries.truncate(ix);
1432                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1433                }
1434            })
1435        })
1436    }
1437
1438    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1439        let git_store = self.project.read(cx).git_store().clone();
1440
1441        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1442            if let Some(checkpoint) = message.checkpoint.as_ref() {
1443                checkpoint.git_checkpoint.clone()
1444            } else {
1445                return Task::ready(Ok(()));
1446            }
1447        } else {
1448            return Task::ready(Ok(()));
1449        };
1450
1451        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1452        cx.spawn(async move |this, cx| {
1453            let new_checkpoint = new_checkpoint
1454                .await
1455                .context("failed to get new checkpoint")
1456                .log_err();
1457            if let Some(new_checkpoint) = new_checkpoint {
1458                let equal = git_store
1459                    .update(cx, |git, cx| {
1460                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1461                    })?
1462                    .await
1463                    .unwrap_or(true);
1464                this.update(cx, |this, cx| {
1465                    let (ix, message) = this.last_user_message().context("no user message")?;
1466                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1467                    checkpoint.show = !equal;
1468                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1469                    anyhow::Ok(())
1470                })??;
1471            }
1472
1473            Ok(())
1474        })
1475    }
1476
1477    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1478        self.entries
1479            .iter_mut()
1480            .enumerate()
1481            .rev()
1482            .find_map(|(ix, entry)| {
1483                if let AgentThreadEntry::UserMessage(message) = entry {
1484                    Some((ix, message))
1485                } else {
1486                    None
1487                }
1488            })
1489    }
1490
1491    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1492        self.entries.iter().find_map(|entry| {
1493            if let AgentThreadEntry::UserMessage(message) = entry {
1494                if message.id.as_ref() == Some(id) {
1495                    Some(message)
1496                } else {
1497                    None
1498                }
1499            } else {
1500                None
1501            }
1502        })
1503    }
1504
1505    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1506        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1507            if let AgentThreadEntry::UserMessage(message) = entry {
1508                if message.id.as_ref() == Some(id) {
1509                    Some((ix, message))
1510                } else {
1511                    None
1512                }
1513            } else {
1514                None
1515            }
1516        })
1517    }
1518
1519    pub fn read_text_file(
1520        &self,
1521        path: PathBuf,
1522        line: Option<u32>,
1523        limit: Option<u32>,
1524        reuse_shared_snapshot: bool,
1525        cx: &mut Context<Self>,
1526    ) -> Task<Result<String>> {
1527        let project = self.project.clone();
1528        let action_log = self.action_log.clone();
1529        cx.spawn(async move |this, cx| {
1530            let load = project.update(cx, |project, cx| {
1531                let path = project
1532                    .project_path_for_absolute_path(&path, cx)
1533                    .context("invalid path")?;
1534                anyhow::Ok(project.open_buffer(path, cx))
1535            });
1536            let buffer = load??.await?;
1537
1538            let snapshot = if reuse_shared_snapshot {
1539                this.read_with(cx, |this, _| {
1540                    this.shared_buffers.get(&buffer.clone()).cloned()
1541                })
1542                .log_err()
1543                .flatten()
1544            } else {
1545                None
1546            };
1547
1548            let snapshot = if let Some(snapshot) = snapshot {
1549                snapshot
1550            } else {
1551                action_log.update(cx, |action_log, cx| {
1552                    action_log.buffer_read(buffer.clone(), cx);
1553                })?;
1554                project.update(cx, |project, cx| {
1555                    let position = buffer
1556                        .read(cx)
1557                        .snapshot()
1558                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1559                    project.set_agent_location(
1560                        Some(AgentLocation {
1561                            buffer: buffer.downgrade(),
1562                            position,
1563                        }),
1564                        cx,
1565                    );
1566                })?;
1567
1568                buffer.update(cx, |buffer, _| buffer.snapshot())?
1569            };
1570
1571            this.update(cx, |this, _| {
1572                let text = snapshot.text();
1573                this.shared_buffers.insert(buffer.clone(), snapshot);
1574                if line.is_none() && limit.is_none() {
1575                    return Ok(text);
1576                }
1577                let limit = limit.unwrap_or(u32::MAX) as usize;
1578                let Some(line) = line else {
1579                    return Ok(text.lines().take(limit).collect::<String>());
1580                };
1581
1582                let count = text.lines().count();
1583                if count < line as usize {
1584                    anyhow::bail!("There are only {} lines", count);
1585                }
1586                Ok(text
1587                    .lines()
1588                    .skip(line as usize + 1)
1589                    .take(limit)
1590                    .collect::<String>())
1591            })?
1592        })
1593    }
1594
1595    pub fn write_text_file(
1596        &self,
1597        path: PathBuf,
1598        content: String,
1599        cx: &mut Context<Self>,
1600    ) -> Task<Result<()>> {
1601        let project = self.project.clone();
1602        let action_log = self.action_log.clone();
1603        cx.spawn(async move |this, cx| {
1604            let load = project.update(cx, |project, cx| {
1605                let path = project
1606                    .project_path_for_absolute_path(&path, cx)
1607                    .context("invalid path")?;
1608                anyhow::Ok(project.open_buffer(path, cx))
1609            });
1610            let buffer = load??.await?;
1611            let snapshot = this.update(cx, |this, cx| {
1612                this.shared_buffers
1613                    .get(&buffer)
1614                    .cloned()
1615                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1616            })?;
1617            let edits = cx
1618                .background_executor()
1619                .spawn(async move {
1620                    let old_text = snapshot.text();
1621                    text_diff(old_text.as_str(), &content)
1622                        .into_iter()
1623                        .map(|(range, replacement)| {
1624                            (
1625                                snapshot.anchor_after(range.start)
1626                                    ..snapshot.anchor_before(range.end),
1627                                replacement,
1628                            )
1629                        })
1630                        .collect::<Vec<_>>()
1631                })
1632                .await;
1633
1634            project.update(cx, |project, cx| {
1635                project.set_agent_location(
1636                    Some(AgentLocation {
1637                        buffer: buffer.downgrade(),
1638                        position: edits
1639                            .last()
1640                            .map(|(range, _)| range.end)
1641                            .unwrap_or(Anchor::MIN),
1642                    }),
1643                    cx,
1644                );
1645            })?;
1646
1647            let format_on_save = cx.update(|cx| {
1648                action_log.update(cx, |action_log, cx| {
1649                    action_log.buffer_read(buffer.clone(), cx);
1650                });
1651
1652                let format_on_save = buffer.update(cx, |buffer, cx| {
1653                    buffer.edit(edits, None, cx);
1654
1655                    let settings = language::language_settings::language_settings(
1656                        buffer.language().map(|l| l.name()),
1657                        buffer.file(),
1658                        cx,
1659                    );
1660
1661                    settings.format_on_save != FormatOnSave::Off
1662                });
1663                action_log.update(cx, |action_log, cx| {
1664                    action_log.buffer_edited(buffer.clone(), cx);
1665                });
1666                format_on_save
1667            })?;
1668
1669            if format_on_save {
1670                let format_task = project.update(cx, |project, cx| {
1671                    project.format(
1672                        HashSet::from_iter([buffer.clone()]),
1673                        LspFormatTarget::Buffers,
1674                        false,
1675                        FormatTrigger::Save,
1676                        cx,
1677                    )
1678                })?;
1679                format_task.await.log_err();
1680
1681                action_log.update(cx, |action_log, cx| {
1682                    action_log.buffer_edited(buffer.clone(), cx);
1683                })?;
1684            }
1685
1686            project
1687                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1688                .await
1689        })
1690    }
1691
1692    pub fn to_markdown(&self, cx: &App) -> String {
1693        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1694    }
1695
1696    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
1697        cx.emit(AcpThreadEvent::LoadError(error));
1698    }
1699}
1700
1701fn markdown_for_raw_output(
1702    raw_output: &serde_json::Value,
1703    language_registry: &Arc<LanguageRegistry>,
1704    cx: &mut App,
1705) -> Option<Entity<Markdown>> {
1706    match raw_output {
1707        serde_json::Value::Null => None,
1708        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1709            Markdown::new(
1710                value.to_string().into(),
1711                Some(language_registry.clone()),
1712                None,
1713                cx,
1714            )
1715        })),
1716        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1717            Markdown::new(
1718                value.to_string().into(),
1719                Some(language_registry.clone()),
1720                None,
1721                cx,
1722            )
1723        })),
1724        serde_json::Value::String(value) => Some(cx.new(|cx| {
1725            Markdown::new(
1726                value.clone().into(),
1727                Some(language_registry.clone()),
1728                None,
1729                cx,
1730            )
1731        })),
1732        value => Some(cx.new(|cx| {
1733            Markdown::new(
1734                format!("```json\n{}\n```", value).into(),
1735                Some(language_registry.clone()),
1736                None,
1737                cx,
1738            )
1739        })),
1740    }
1741}
1742
1743#[cfg(test)]
1744mod tests {
1745    use super::*;
1746    use anyhow::anyhow;
1747    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1748    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1749    use indoc::indoc;
1750    use project::{FakeFs, Fs};
1751    use rand::Rng as _;
1752    use serde_json::json;
1753    use settings::SettingsStore;
1754    use smol::stream::StreamExt as _;
1755    use std::{
1756        any::Any,
1757        cell::RefCell,
1758        path::Path,
1759        rc::Rc,
1760        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1761        time::Duration,
1762    };
1763    use util::path;
1764
1765    fn init_test(cx: &mut TestAppContext) {
1766        env_logger::try_init().ok();
1767        cx.update(|cx| {
1768            let settings_store = SettingsStore::test(cx);
1769            cx.set_global(settings_store);
1770            Project::init_settings(cx);
1771            language::init(cx);
1772        });
1773    }
1774
1775    #[gpui::test]
1776    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1777        init_test(cx);
1778
1779        let fs = FakeFs::new(cx.executor());
1780        let project = Project::test(fs, [], cx).await;
1781        let connection = Rc::new(FakeAgentConnection::new());
1782        let thread = cx
1783            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1784            .await
1785            .unwrap();
1786
1787        // Test creating a new user message
1788        thread.update(cx, |thread, cx| {
1789            thread.push_user_content_block(
1790                None,
1791                acp::ContentBlock::Text(acp::TextContent {
1792                    annotations: None,
1793                    text: "Hello, ".to_string(),
1794                }),
1795                cx,
1796            );
1797        });
1798
1799        thread.update(cx, |thread, cx| {
1800            assert_eq!(thread.entries.len(), 1);
1801            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1802                assert_eq!(user_msg.id, None);
1803                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1804            } else {
1805                panic!("Expected UserMessage");
1806            }
1807        });
1808
1809        // Test appending to existing user message
1810        let message_1_id = UserMessageId::new();
1811        thread.update(cx, |thread, cx| {
1812            thread.push_user_content_block(
1813                Some(message_1_id.clone()),
1814                acp::ContentBlock::Text(acp::TextContent {
1815                    annotations: None,
1816                    text: "world!".to_string(),
1817                }),
1818                cx,
1819            );
1820        });
1821
1822        thread.update(cx, |thread, cx| {
1823            assert_eq!(thread.entries.len(), 1);
1824            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1825                assert_eq!(user_msg.id, Some(message_1_id));
1826                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1827            } else {
1828                panic!("Expected UserMessage");
1829            }
1830        });
1831
1832        // Test creating new user message after assistant message
1833        thread.update(cx, |thread, cx| {
1834            thread.push_assistant_content_block(
1835                acp::ContentBlock::Text(acp::TextContent {
1836                    annotations: None,
1837                    text: "Assistant response".to_string(),
1838                }),
1839                false,
1840                cx,
1841            );
1842        });
1843
1844        let message_2_id = UserMessageId::new();
1845        thread.update(cx, |thread, cx| {
1846            thread.push_user_content_block(
1847                Some(message_2_id.clone()),
1848                acp::ContentBlock::Text(acp::TextContent {
1849                    annotations: None,
1850                    text: "New user message".to_string(),
1851                }),
1852                cx,
1853            );
1854        });
1855
1856        thread.update(cx, |thread, cx| {
1857            assert_eq!(thread.entries.len(), 3);
1858            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1859                assert_eq!(user_msg.id, Some(message_2_id));
1860                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1861            } else {
1862                panic!("Expected UserMessage at index 2");
1863            }
1864        });
1865    }
1866
1867    #[gpui::test]
1868    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1869        init_test(cx);
1870
1871        let fs = FakeFs::new(cx.executor());
1872        let project = Project::test(fs, [], cx).await;
1873        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1874            |_, thread, mut cx| {
1875                async move {
1876                    thread.update(&mut cx, |thread, cx| {
1877                        thread
1878                            .handle_session_update(
1879                                acp::SessionUpdate::AgentThoughtChunk {
1880                                    content: "Thinking ".into(),
1881                                },
1882                                cx,
1883                            )
1884                            .unwrap();
1885                        thread
1886                            .handle_session_update(
1887                                acp::SessionUpdate::AgentThoughtChunk {
1888                                    content: "hard!".into(),
1889                                },
1890                                cx,
1891                            )
1892                            .unwrap();
1893                    })?;
1894                    Ok(acp::PromptResponse {
1895                        stop_reason: acp::StopReason::EndTurn,
1896                    })
1897                }
1898                .boxed_local()
1899            },
1900        ));
1901
1902        let thread = cx
1903            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1904            .await
1905            .unwrap();
1906
1907        thread
1908            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1909            .await
1910            .unwrap();
1911
1912        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1913        assert_eq!(
1914            output,
1915            indoc! {r#"
1916            ## User
1917
1918            Hello from Zed!
1919
1920            ## Assistant
1921
1922            <thinking>
1923            Thinking hard!
1924            </thinking>
1925
1926            "#}
1927        );
1928    }
1929
1930    #[gpui::test]
1931    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1932        init_test(cx);
1933
1934        let fs = FakeFs::new(cx.executor());
1935        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1936            .await;
1937        let project = Project::test(fs.clone(), [], cx).await;
1938        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1939        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1940        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1941            move |_, thread, mut cx| {
1942                let read_file_tx = read_file_tx.clone();
1943                async move {
1944                    let content = thread
1945                        .update(&mut cx, |thread, cx| {
1946                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1947                        })
1948                        .unwrap()
1949                        .await
1950                        .unwrap();
1951                    assert_eq!(content, "one\ntwo\nthree\n");
1952                    read_file_tx.take().unwrap().send(()).unwrap();
1953                    thread
1954                        .update(&mut cx, |thread, cx| {
1955                            thread.write_text_file(
1956                                path!("/tmp/foo").into(),
1957                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1958                                cx,
1959                            )
1960                        })
1961                        .unwrap()
1962                        .await
1963                        .unwrap();
1964                    Ok(acp::PromptResponse {
1965                        stop_reason: acp::StopReason::EndTurn,
1966                    })
1967                }
1968                .boxed_local()
1969            },
1970        ));
1971
1972        let (worktree, pathbuf) = project
1973            .update(cx, |project, cx| {
1974                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1975            })
1976            .await
1977            .unwrap();
1978        let buffer = project
1979            .update(cx, |project, cx| {
1980                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1981            })
1982            .await
1983            .unwrap();
1984
1985        let thread = cx
1986            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1987            .await
1988            .unwrap();
1989
1990        let request = thread.update(cx, |thread, cx| {
1991            thread.send_raw("Extend the count in /tmp/foo", cx)
1992        });
1993        read_file_rx.await.ok();
1994        buffer.update(cx, |buffer, cx| {
1995            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1996        });
1997        cx.run_until_parked();
1998        assert_eq!(
1999            buffer.read_with(cx, |buffer, _| buffer.text()),
2000            "zero\none\ntwo\nthree\nfour\nfive\n"
2001        );
2002        assert_eq!(
2003            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2004            "zero\none\ntwo\nthree\nfour\nfive\n"
2005        );
2006        request.await.unwrap();
2007    }
2008
2009    #[gpui::test]
2010    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2011        init_test(cx);
2012
2013        let fs = FakeFs::new(cx.executor());
2014        let project = Project::test(fs, [], cx).await;
2015        let id = acp::ToolCallId("test".into());
2016
2017        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2018            let id = id.clone();
2019            move |_, thread, mut cx| {
2020                let id = id.clone();
2021                async move {
2022                    thread
2023                        .update(&mut cx, |thread, cx| {
2024                            thread.handle_session_update(
2025                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2026                                    id: id.clone(),
2027                                    title: "Label".into(),
2028                                    kind: acp::ToolKind::Fetch,
2029                                    status: acp::ToolCallStatus::InProgress,
2030                                    content: vec![],
2031                                    locations: vec![],
2032                                    raw_input: None,
2033                                    raw_output: None,
2034                                }),
2035                                cx,
2036                            )
2037                        })
2038                        .unwrap()
2039                        .unwrap();
2040                    Ok(acp::PromptResponse {
2041                        stop_reason: acp::StopReason::EndTurn,
2042                    })
2043                }
2044                .boxed_local()
2045            }
2046        }));
2047
2048        let thread = cx
2049            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2050            .await
2051            .unwrap();
2052
2053        let request = thread.update(cx, |thread, cx| {
2054            thread.send_raw("Fetch https://example.com", cx)
2055        });
2056
2057        run_until_first_tool_call(&thread, cx).await;
2058
2059        thread.read_with(cx, |thread, _| {
2060            assert!(matches!(
2061                thread.entries[1],
2062                AgentThreadEntry::ToolCall(ToolCall {
2063                    status: ToolCallStatus::InProgress,
2064                    ..
2065                })
2066            ));
2067        });
2068
2069        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2070
2071        thread.read_with(cx, |thread, _| {
2072            assert!(matches!(
2073                &thread.entries[1],
2074                AgentThreadEntry::ToolCall(ToolCall {
2075                    status: ToolCallStatus::Canceled,
2076                    ..
2077                })
2078            ));
2079        });
2080
2081        thread
2082            .update(cx, |thread, cx| {
2083                thread.handle_session_update(
2084                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2085                        id,
2086                        fields: acp::ToolCallUpdateFields {
2087                            status: Some(acp::ToolCallStatus::Completed),
2088                            ..Default::default()
2089                        },
2090                    }),
2091                    cx,
2092                )
2093            })
2094            .unwrap();
2095
2096        request.await.unwrap();
2097
2098        thread.read_with(cx, |thread, _| {
2099            assert!(matches!(
2100                thread.entries[1],
2101                AgentThreadEntry::ToolCall(ToolCall {
2102                    status: ToolCallStatus::Completed,
2103                    ..
2104                })
2105            ));
2106        });
2107    }
2108
2109    #[gpui::test]
2110    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2111        init_test(cx);
2112        let fs = FakeFs::new(cx.background_executor.clone());
2113        fs.insert_tree(path!("/test"), json!({})).await;
2114        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2115
2116        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2117            move |_, thread, mut cx| {
2118                async move {
2119                    thread
2120                        .update(&mut cx, |thread, cx| {
2121                            thread.handle_session_update(
2122                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2123                                    id: acp::ToolCallId("test".into()),
2124                                    title: "Label".into(),
2125                                    kind: acp::ToolKind::Edit,
2126                                    status: acp::ToolCallStatus::Completed,
2127                                    content: vec![acp::ToolCallContent::Diff {
2128                                        diff: acp::Diff {
2129                                            path: "/test/test.txt".into(),
2130                                            old_text: None,
2131                                            new_text: "foo".into(),
2132                                        },
2133                                    }],
2134                                    locations: vec![],
2135                                    raw_input: None,
2136                                    raw_output: None,
2137                                }),
2138                                cx,
2139                            )
2140                        })
2141                        .unwrap()
2142                        .unwrap();
2143                    Ok(acp::PromptResponse {
2144                        stop_reason: acp::StopReason::EndTurn,
2145                    })
2146                }
2147                .boxed_local()
2148            }
2149        }));
2150
2151        let thread = cx
2152            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2153            .await
2154            .unwrap();
2155
2156        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2157            .await
2158            .unwrap();
2159
2160        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2161    }
2162
2163    #[gpui::test(iterations = 10)]
2164    async fn test_checkpoints(cx: &mut TestAppContext) {
2165        init_test(cx);
2166        let fs = FakeFs::new(cx.background_executor.clone());
2167        fs.insert_tree(
2168            path!("/test"),
2169            json!({
2170                ".git": {}
2171            }),
2172        )
2173        .await;
2174        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2175
2176        let simulate_changes = Arc::new(AtomicBool::new(true));
2177        let next_filename = Arc::new(AtomicUsize::new(0));
2178        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2179            let simulate_changes = simulate_changes.clone();
2180            let next_filename = next_filename.clone();
2181            let fs = fs.clone();
2182            move |request, thread, mut cx| {
2183                let fs = fs.clone();
2184                let simulate_changes = simulate_changes.clone();
2185                let next_filename = next_filename.clone();
2186                async move {
2187                    if simulate_changes.load(SeqCst) {
2188                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2189                        fs.write(Path::new(&filename), b"").await?;
2190                    }
2191
2192                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2193                        panic!("expected text content block");
2194                    };
2195                    thread.update(&mut cx, |thread, cx| {
2196                        thread
2197                            .handle_session_update(
2198                                acp::SessionUpdate::AgentMessageChunk {
2199                                    content: content.text.to_uppercase().into(),
2200                                },
2201                                cx,
2202                            )
2203                            .unwrap();
2204                    })?;
2205                    Ok(acp::PromptResponse {
2206                        stop_reason: acp::StopReason::EndTurn,
2207                    })
2208                }
2209                .boxed_local()
2210            }
2211        }));
2212        let thread = cx
2213            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2214            .await
2215            .unwrap();
2216
2217        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2218            .await
2219            .unwrap();
2220        thread.read_with(cx, |thread, cx| {
2221            assert_eq!(
2222                thread.to_markdown(cx),
2223                indoc! {"
2224                    ## User (checkpoint)
2225
2226                    Lorem
2227
2228                    ## Assistant
2229
2230                    LOREM
2231
2232                "}
2233            );
2234        });
2235        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2236
2237        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2238            .await
2239            .unwrap();
2240        thread.read_with(cx, |thread, cx| {
2241            assert_eq!(
2242                thread.to_markdown(cx),
2243                indoc! {"
2244                    ## User (checkpoint)
2245
2246                    Lorem
2247
2248                    ## Assistant
2249
2250                    LOREM
2251
2252                    ## User (checkpoint)
2253
2254                    ipsum
2255
2256                    ## Assistant
2257
2258                    IPSUM
2259
2260                "}
2261            );
2262        });
2263        assert_eq!(
2264            fs.files(),
2265            vec![
2266                Path::new(path!("/test/file-0")),
2267                Path::new(path!("/test/file-1"))
2268            ]
2269        );
2270
2271        // Checkpoint isn't stored when there are no changes.
2272        simulate_changes.store(false, SeqCst);
2273        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2274            .await
2275            .unwrap();
2276        thread.read_with(cx, |thread, cx| {
2277            assert_eq!(
2278                thread.to_markdown(cx),
2279                indoc! {"
2280                    ## User (checkpoint)
2281
2282                    Lorem
2283
2284                    ## Assistant
2285
2286                    LOREM
2287
2288                    ## User (checkpoint)
2289
2290                    ipsum
2291
2292                    ## Assistant
2293
2294                    IPSUM
2295
2296                    ## User
2297
2298                    dolor
2299
2300                    ## Assistant
2301
2302                    DOLOR
2303
2304                "}
2305            );
2306        });
2307        assert_eq!(
2308            fs.files(),
2309            vec![
2310                Path::new(path!("/test/file-0")),
2311                Path::new(path!("/test/file-1"))
2312            ]
2313        );
2314
2315        // Rewinding the conversation truncates the history and restores the checkpoint.
2316        thread
2317            .update(cx, |thread, cx| {
2318                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2319                    panic!("unexpected entries {:?}", thread.entries)
2320                };
2321                thread.rewind(message.id.clone().unwrap(), cx)
2322            })
2323            .await
2324            .unwrap();
2325        thread.read_with(cx, |thread, cx| {
2326            assert_eq!(
2327                thread.to_markdown(cx),
2328                indoc! {"
2329                    ## User (checkpoint)
2330
2331                    Lorem
2332
2333                    ## Assistant
2334
2335                    LOREM
2336
2337                "}
2338            );
2339        });
2340        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2341    }
2342
2343    async fn run_until_first_tool_call(
2344        thread: &Entity<AcpThread>,
2345        cx: &mut TestAppContext,
2346    ) -> usize {
2347        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2348
2349        let subscription = cx.update(|cx| {
2350            cx.subscribe(thread, move |thread, _, cx| {
2351                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2352                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2353                        return tx.try_send(ix).unwrap();
2354                    }
2355                }
2356            })
2357        });
2358
2359        select! {
2360            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2361                panic!("Timeout waiting for tool call")
2362            }
2363            ix = rx.next().fuse() => {
2364                drop(subscription);
2365                ix.unwrap()
2366            }
2367        }
2368    }
2369
2370    #[derive(Clone, Default)]
2371    struct FakeAgentConnection {
2372        auth_methods: Vec<acp::AuthMethod>,
2373        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2374        on_user_message: Option<
2375            Rc<
2376                dyn Fn(
2377                        acp::PromptRequest,
2378                        WeakEntity<AcpThread>,
2379                        AsyncApp,
2380                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2381                    + 'static,
2382            >,
2383        >,
2384    }
2385
2386    impl FakeAgentConnection {
2387        fn new() -> Self {
2388            Self {
2389                auth_methods: Vec::new(),
2390                on_user_message: None,
2391                sessions: Arc::default(),
2392            }
2393        }
2394
2395        #[expect(unused)]
2396        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2397            self.auth_methods = auth_methods;
2398            self
2399        }
2400
2401        fn on_user_message(
2402            mut self,
2403            handler: impl Fn(
2404                acp::PromptRequest,
2405                WeakEntity<AcpThread>,
2406                AsyncApp,
2407            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2408            + 'static,
2409        ) -> Self {
2410            self.on_user_message.replace(Rc::new(handler));
2411            self
2412        }
2413    }
2414
2415    impl AgentConnection for FakeAgentConnection {
2416        fn auth_methods(&self) -> &[acp::AuthMethod] {
2417            &self.auth_methods
2418        }
2419
2420        fn new_thread(
2421            self: Rc<Self>,
2422            project: Entity<Project>,
2423            _cwd: &Path,
2424            cx: &mut App,
2425        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2426            let session_id = acp::SessionId(
2427                rand::thread_rng()
2428                    .sample_iter(&rand::distributions::Alphanumeric)
2429                    .take(7)
2430                    .map(char::from)
2431                    .collect::<String>()
2432                    .into(),
2433            );
2434            let action_log = cx.new(|_| ActionLog::new(project.clone()));
2435            let thread = cx.new(|_cx| {
2436                AcpThread::new(
2437                    "Test",
2438                    self.clone(),
2439                    project,
2440                    action_log,
2441                    session_id.clone(),
2442                )
2443            });
2444            self.sessions.lock().insert(session_id, thread.downgrade());
2445            Task::ready(Ok(thread))
2446        }
2447
2448        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2449            if self.auth_methods().iter().any(|m| m.id == method) {
2450                Task::ready(Ok(()))
2451            } else {
2452                Task::ready(Err(anyhow!("Invalid Auth Method")))
2453            }
2454        }
2455
2456        fn prompt(
2457            &self,
2458            _id: Option<UserMessageId>,
2459            params: acp::PromptRequest,
2460            cx: &mut App,
2461        ) -> Task<gpui::Result<acp::PromptResponse>> {
2462            let sessions = self.sessions.lock();
2463            let thread = sessions.get(&params.session_id).unwrap();
2464            if let Some(handler) = &self.on_user_message {
2465                let handler = handler.clone();
2466                let thread = thread.clone();
2467                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2468            } else {
2469                Task::ready(Ok(acp::PromptResponse {
2470                    stop_reason: acp::StopReason::EndTurn,
2471                }))
2472            }
2473        }
2474
2475        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2476            let sessions = self.sessions.lock();
2477            let thread = sessions.get(session_id).unwrap().clone();
2478
2479            cx.spawn(async move |cx| {
2480                thread
2481                    .update(cx, |thread, cx| thread.cancel(cx))
2482                    .unwrap()
2483                    .await
2484            })
2485            .detach();
2486        }
2487
2488        fn session_editor(
2489            &self,
2490            session_id: &acp::SessionId,
2491            _cx: &mut App,
2492        ) -> Option<Rc<dyn AgentSessionEditor>> {
2493            Some(Rc::new(FakeAgentSessionEditor {
2494                _session_id: session_id.clone(),
2495            }))
2496        }
2497
2498        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2499            self
2500        }
2501    }
2502
2503    struct FakeAgentSessionEditor {
2504        _session_id: acp::SessionId,
2505    }
2506
2507    impl AgentSessionEditor for FakeAgentSessionEditor {
2508        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2509            Task::ready(Ok(()))
2510        }
2511    }
2512}