acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6use collections::HashSet;
   7pub use connection::*;
   8pub use diff::*;
   9use language::language_settings::FormatOnSave;
  10pub use mention::*;
  11use project::lsp_store::{FormatTrigger, LspFormatTarget};
  12use serde::{Deserialize, Serialize};
  13pub use terminal::*;
  14
  15use action_log::ActionLog;
  16use agent_client_protocol as acp;
  17use anyhow::{Context as _, Result, anyhow};
  18use editor::Bias;
  19use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  20use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  21use itertools::Itertools;
  22use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  23use markdown::Markdown;
  24use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  25use std::collections::HashMap;
  26use std::error::Error;
  27use std::fmt::{Formatter, Write};
  28use std::ops::Range;
  29use std::process::ExitStatus;
  30use std::rc::Rc;
  31use std::time::{Duration, Instant};
  32use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  33use ui::App;
  34use util::ResultExt;
  35
  36#[derive(Debug)]
  37pub struct UserMessage {
  38    pub id: Option<UserMessageId>,
  39    pub content: ContentBlock,
  40    pub chunks: Vec<acp::ContentBlock>,
  41    pub checkpoint: Option<Checkpoint>,
  42}
  43
  44#[derive(Debug)]
  45pub struct Checkpoint {
  46    git_checkpoint: GitStoreCheckpoint,
  47    pub show: bool,
  48}
  49
  50impl UserMessage {
  51    fn to_markdown(&self, cx: &App) -> String {
  52        let mut markdown = String::new();
  53        if self
  54            .checkpoint
  55            .as_ref()
  56            .is_some_and(|checkpoint| checkpoint.show)
  57        {
  58            writeln!(markdown, "## User (checkpoint)").unwrap();
  59        } else {
  60            writeln!(markdown, "## User").unwrap();
  61        }
  62        writeln!(markdown).unwrap();
  63        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  64        writeln!(markdown).unwrap();
  65        markdown
  66    }
  67}
  68
  69#[derive(Debug, PartialEq)]
  70pub struct AssistantMessage {
  71    pub chunks: Vec<AssistantMessageChunk>,
  72}
  73
  74impl AssistantMessage {
  75    pub fn to_markdown(&self, cx: &App) -> String {
  76        format!(
  77            "## Assistant\n\n{}\n\n",
  78            self.chunks
  79                .iter()
  80                .map(|chunk| chunk.to_markdown(cx))
  81                .join("\n\n")
  82        )
  83    }
  84}
  85
  86#[derive(Debug, PartialEq)]
  87pub enum AssistantMessageChunk {
  88    Message { block: ContentBlock },
  89    Thought { block: ContentBlock },
  90}
  91
  92impl AssistantMessageChunk {
  93    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  94        Self::Message {
  95            block: ContentBlock::new(chunk.into(), language_registry, cx),
  96        }
  97    }
  98
  99    fn to_markdown(&self, cx: &App) -> String {
 100        match self {
 101            Self::Message { block } => block.to_markdown(cx).to_string(),
 102            Self::Thought { block } => {
 103                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 104            }
 105        }
 106    }
 107}
 108
 109#[derive(Debug)]
 110pub enum AgentThreadEntry {
 111    UserMessage(UserMessage),
 112    AssistantMessage(AssistantMessage),
 113    ToolCall(ToolCall),
 114}
 115
 116impl AgentThreadEntry {
 117    pub fn to_markdown(&self, cx: &App) -> String {
 118        match self {
 119            Self::UserMessage(message) => message.to_markdown(cx),
 120            Self::AssistantMessage(message) => message.to_markdown(cx),
 121            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 122        }
 123    }
 124
 125    pub fn user_message(&self) -> Option<&UserMessage> {
 126        if let AgentThreadEntry::UserMessage(message) = self {
 127            Some(message)
 128        } else {
 129            None
 130        }
 131    }
 132
 133    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 134        if let AgentThreadEntry::ToolCall(call) = self {
 135            itertools::Either::Left(call.diffs())
 136        } else {
 137            itertools::Either::Right(std::iter::empty())
 138        }
 139    }
 140
 141    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 142        if let AgentThreadEntry::ToolCall(call) = self {
 143            itertools::Either::Left(call.terminals())
 144        } else {
 145            itertools::Either::Right(std::iter::empty())
 146        }
 147    }
 148
 149    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 150        if let AgentThreadEntry::ToolCall(ToolCall {
 151            locations,
 152            resolved_locations,
 153            ..
 154        }) = self
 155        {
 156            Some((
 157                locations.get(ix)?.clone(),
 158                resolved_locations.get(ix)?.clone()?,
 159            ))
 160        } else {
 161            None
 162        }
 163    }
 164}
 165
 166#[derive(Debug)]
 167pub struct ToolCall {
 168    pub id: acp::ToolCallId,
 169    pub label: Entity<Markdown>,
 170    pub kind: acp::ToolKind,
 171    pub content: Vec<ToolCallContent>,
 172    pub status: ToolCallStatus,
 173    pub locations: Vec<acp::ToolCallLocation>,
 174    pub resolved_locations: Vec<Option<AgentLocation>>,
 175    pub raw_input: Option<serde_json::Value>,
 176    pub raw_output: Option<serde_json::Value>,
 177}
 178
 179impl ToolCall {
 180    fn from_acp(
 181        tool_call: acp::ToolCall,
 182        status: ToolCallStatus,
 183        language_registry: Arc<LanguageRegistry>,
 184        cx: &mut App,
 185    ) -> Self {
 186        Self {
 187            id: tool_call.id,
 188            label: cx.new(|cx| {
 189                Markdown::new(
 190                    tool_call.title.into(),
 191                    Some(language_registry.clone()),
 192                    None,
 193                    cx,
 194                )
 195            }),
 196            kind: tool_call.kind,
 197            content: tool_call
 198                .content
 199                .into_iter()
 200                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 201                .collect(),
 202            locations: tool_call.locations,
 203            resolved_locations: Vec::default(),
 204            status,
 205            raw_input: tool_call.raw_input,
 206            raw_output: tool_call.raw_output,
 207        }
 208    }
 209
 210    fn update_fields(
 211        &mut self,
 212        fields: acp::ToolCallUpdateFields,
 213        language_registry: Arc<LanguageRegistry>,
 214        cx: &mut App,
 215    ) {
 216        let acp::ToolCallUpdateFields {
 217            kind,
 218            status,
 219            title,
 220            content,
 221            locations,
 222            raw_input,
 223            raw_output,
 224        } = fields;
 225
 226        if let Some(kind) = kind {
 227            self.kind = kind;
 228        }
 229
 230        if let Some(status) = status {
 231            self.status = status.into();
 232        }
 233
 234        if let Some(title) = title {
 235            self.label.update(cx, |label, cx| {
 236                label.replace(title, cx);
 237            });
 238        }
 239
 240        if let Some(content) = content {
 241            self.content = content
 242                .into_iter()
 243                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 244                .collect();
 245        }
 246
 247        if let Some(locations) = locations {
 248            self.locations = locations;
 249        }
 250
 251        if let Some(raw_input) = raw_input {
 252            self.raw_input = Some(raw_input);
 253        }
 254
 255        if let Some(raw_output) = raw_output {
 256            if self.content.is_empty()
 257                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 258            {
 259                self.content
 260                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 261                        markdown,
 262                    }));
 263            }
 264            self.raw_output = Some(raw_output);
 265        }
 266    }
 267
 268    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 269        self.content.iter().filter_map(|content| match content {
 270            ToolCallContent::Diff(diff) => Some(diff),
 271            ToolCallContent::ContentBlock(_) => None,
 272            ToolCallContent::Terminal(_) => None,
 273        })
 274    }
 275
 276    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 277        self.content.iter().filter_map(|content| match content {
 278            ToolCallContent::Terminal(terminal) => Some(terminal),
 279            ToolCallContent::ContentBlock(_) => None,
 280            ToolCallContent::Diff(_) => None,
 281        })
 282    }
 283
 284    fn to_markdown(&self, cx: &App) -> String {
 285        let mut markdown = format!(
 286            "**Tool Call: {}**\nStatus: {}\n\n",
 287            self.label.read(cx).source(),
 288            self.status
 289        );
 290        for content in &self.content {
 291            markdown.push_str(content.to_markdown(cx).as_str());
 292            markdown.push_str("\n\n");
 293        }
 294        markdown
 295    }
 296
 297    async fn resolve_location(
 298        location: acp::ToolCallLocation,
 299        project: WeakEntity<Project>,
 300        cx: &mut AsyncApp,
 301    ) -> Option<AgentLocation> {
 302        let buffer = project
 303            .update(cx, |project, cx| {
 304                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 305                    Some(project.open_buffer(path, cx))
 306                } else {
 307                    None
 308                }
 309            })
 310            .ok()??;
 311        let buffer = buffer.await.log_err()?;
 312        let position = buffer
 313            .update(cx, |buffer, _| {
 314                if let Some(row) = location.line {
 315                    let snapshot = buffer.snapshot();
 316                    let column = snapshot.indent_size_for_line(row).len;
 317                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 318                    snapshot.anchor_before(point)
 319                } else {
 320                    Anchor::MIN
 321                }
 322            })
 323            .ok()?;
 324
 325        Some(AgentLocation {
 326            buffer: buffer.downgrade(),
 327            position,
 328        })
 329    }
 330
 331    fn resolve_locations(
 332        &self,
 333        project: Entity<Project>,
 334        cx: &mut App,
 335    ) -> Task<Vec<Option<AgentLocation>>> {
 336        let locations = self.locations.clone();
 337        project.update(cx, |_, cx| {
 338            cx.spawn(async move |project, cx| {
 339                let mut new_locations = Vec::new();
 340                for location in locations {
 341                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 342                }
 343                new_locations
 344            })
 345        })
 346    }
 347}
 348
 349#[derive(Debug)]
 350pub enum ToolCallStatus {
 351    /// The tool call hasn't started running yet, but we start showing it to
 352    /// the user.
 353    Pending,
 354    /// The tool call is waiting for confirmation from the user.
 355    WaitingForConfirmation {
 356        options: Vec<acp::PermissionOption>,
 357        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 358    },
 359    /// The tool call is currently running.
 360    InProgress,
 361    /// The tool call completed successfully.
 362    Completed,
 363    /// The tool call failed.
 364    Failed,
 365    /// The user rejected the tool call.
 366    Rejected,
 367    /// The user canceled generation so the tool call was canceled.
 368    Canceled,
 369}
 370
 371impl From<acp::ToolCallStatus> for ToolCallStatus {
 372    fn from(status: acp::ToolCallStatus) -> Self {
 373        match status {
 374            acp::ToolCallStatus::Pending => Self::Pending,
 375            acp::ToolCallStatus::InProgress => Self::InProgress,
 376            acp::ToolCallStatus::Completed => Self::Completed,
 377            acp::ToolCallStatus::Failed => Self::Failed,
 378        }
 379    }
 380}
 381
 382impl Display for ToolCallStatus {
 383    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 384        write!(
 385            f,
 386            "{}",
 387            match self {
 388                ToolCallStatus::Pending => "Pending",
 389                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 390                ToolCallStatus::InProgress => "In Progress",
 391                ToolCallStatus::Completed => "Completed",
 392                ToolCallStatus::Failed => "Failed",
 393                ToolCallStatus::Rejected => "Rejected",
 394                ToolCallStatus::Canceled => "Canceled",
 395            }
 396        )
 397    }
 398}
 399
 400#[derive(Debug, PartialEq, Clone)]
 401pub enum ContentBlock {
 402    Empty,
 403    Markdown { markdown: Entity<Markdown> },
 404    ResourceLink { resource_link: acp::ResourceLink },
 405}
 406
 407impl ContentBlock {
 408    pub fn new(
 409        block: acp::ContentBlock,
 410        language_registry: &Arc<LanguageRegistry>,
 411        cx: &mut App,
 412    ) -> Self {
 413        let mut this = Self::Empty;
 414        this.append(block, language_registry, cx);
 415        this
 416    }
 417
 418    pub fn new_combined(
 419        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 420        language_registry: Arc<LanguageRegistry>,
 421        cx: &mut App,
 422    ) -> Self {
 423        let mut this = Self::Empty;
 424        for block in blocks {
 425            this.append(block, &language_registry, cx);
 426        }
 427        this
 428    }
 429
 430    pub fn append(
 431        &mut self,
 432        block: acp::ContentBlock,
 433        language_registry: &Arc<LanguageRegistry>,
 434        cx: &mut App,
 435    ) {
 436        if matches!(self, ContentBlock::Empty)
 437            && let acp::ContentBlock::ResourceLink(resource_link) = block
 438        {
 439            *self = ContentBlock::ResourceLink { resource_link };
 440            return;
 441        }
 442
 443        let new_content = self.block_string_contents(block);
 444
 445        match self {
 446            ContentBlock::Empty => {
 447                *self = Self::create_markdown_block(new_content, language_registry, cx);
 448            }
 449            ContentBlock::Markdown { markdown } => {
 450                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 451            }
 452            ContentBlock::ResourceLink { resource_link } => {
 453                let existing_content = Self::resource_link_md(&resource_link.uri);
 454                let combined = format!("{}\n{}", existing_content, new_content);
 455
 456                *self = Self::create_markdown_block(combined, language_registry, cx);
 457            }
 458        }
 459    }
 460
 461    fn create_markdown_block(
 462        content: String,
 463        language_registry: &Arc<LanguageRegistry>,
 464        cx: &mut App,
 465    ) -> ContentBlock {
 466        ContentBlock::Markdown {
 467            markdown: cx
 468                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 469        }
 470    }
 471
 472    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 473        match block {
 474            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 475            acp::ContentBlock::ResourceLink(resource_link) => {
 476                Self::resource_link_md(&resource_link.uri)
 477            }
 478            acp::ContentBlock::Resource(acp::EmbeddedResource {
 479                resource:
 480                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 481                        uri,
 482                        ..
 483                    }),
 484                ..
 485            }) => Self::resource_link_md(&uri),
 486            acp::ContentBlock::Image(image) => Self::image_md(&image),
 487            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 488        }
 489    }
 490
 491    fn resource_link_md(uri: &str) -> String {
 492        if let Some(uri) = MentionUri::parse(uri).log_err() {
 493            uri.as_link().to_string()
 494        } else {
 495            uri.to_string()
 496        }
 497    }
 498
 499    fn image_md(_image: &acp::ImageContent) -> String {
 500        "`Image`".into()
 501    }
 502
 503    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 504        match self {
 505            ContentBlock::Empty => "",
 506            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 507            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 508        }
 509    }
 510
 511    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 512        match self {
 513            ContentBlock::Empty => None,
 514            ContentBlock::Markdown { markdown } => Some(markdown),
 515            ContentBlock::ResourceLink { .. } => None,
 516        }
 517    }
 518
 519    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 520        match self {
 521            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 522            _ => None,
 523        }
 524    }
 525}
 526
 527#[derive(Debug)]
 528pub enum ToolCallContent {
 529    ContentBlock(ContentBlock),
 530    Diff(Entity<Diff>),
 531    Terminal(Entity<Terminal>),
 532}
 533
 534impl ToolCallContent {
 535    pub fn from_acp(
 536        content: acp::ToolCallContent,
 537        language_registry: Arc<LanguageRegistry>,
 538        cx: &mut App,
 539    ) -> Self {
 540        match content {
 541            acp::ToolCallContent::Content { content } => {
 542                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 543            }
 544            acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
 545                Diff::finalized(
 546                    diff.path,
 547                    diff.old_text,
 548                    diff.new_text,
 549                    language_registry,
 550                    cx,
 551                )
 552            })),
 553        }
 554    }
 555
 556    pub fn to_markdown(&self, cx: &App) -> String {
 557        match self {
 558            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 559            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 560            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 561        }
 562    }
 563}
 564
 565#[derive(Debug, PartialEq)]
 566pub enum ToolCallUpdate {
 567    UpdateFields(acp::ToolCallUpdate),
 568    UpdateDiff(ToolCallUpdateDiff),
 569    UpdateTerminal(ToolCallUpdateTerminal),
 570}
 571
 572impl ToolCallUpdate {
 573    fn id(&self) -> &acp::ToolCallId {
 574        match self {
 575            Self::UpdateFields(update) => &update.id,
 576            Self::UpdateDiff(diff) => &diff.id,
 577            Self::UpdateTerminal(terminal) => &terminal.id,
 578        }
 579    }
 580}
 581
 582impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 583    fn from(update: acp::ToolCallUpdate) -> Self {
 584        Self::UpdateFields(update)
 585    }
 586}
 587
 588impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 589    fn from(diff: ToolCallUpdateDiff) -> Self {
 590        Self::UpdateDiff(diff)
 591    }
 592}
 593
 594#[derive(Debug, PartialEq)]
 595pub struct ToolCallUpdateDiff {
 596    pub id: acp::ToolCallId,
 597    pub diff: Entity<Diff>,
 598}
 599
 600impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 601    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 602        Self::UpdateTerminal(terminal)
 603    }
 604}
 605
 606#[derive(Debug, PartialEq)]
 607pub struct ToolCallUpdateTerminal {
 608    pub id: acp::ToolCallId,
 609    pub terminal: Entity<Terminal>,
 610}
 611
 612#[derive(Debug, Default)]
 613pub struct Plan {
 614    pub entries: Vec<PlanEntry>,
 615}
 616
 617#[derive(Debug)]
 618pub struct PlanStats<'a> {
 619    pub in_progress_entry: Option<&'a PlanEntry>,
 620    pub pending: u32,
 621    pub completed: u32,
 622}
 623
 624impl Plan {
 625    pub fn is_empty(&self) -> bool {
 626        self.entries.is_empty()
 627    }
 628
 629    pub fn stats(&self) -> PlanStats<'_> {
 630        let mut stats = PlanStats {
 631            in_progress_entry: None,
 632            pending: 0,
 633            completed: 0,
 634        };
 635
 636        for entry in &self.entries {
 637            match &entry.status {
 638                acp::PlanEntryStatus::Pending => {
 639                    stats.pending += 1;
 640                }
 641                acp::PlanEntryStatus::InProgress => {
 642                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 643                }
 644                acp::PlanEntryStatus::Completed => {
 645                    stats.completed += 1;
 646                }
 647            }
 648        }
 649
 650        stats
 651    }
 652}
 653
 654#[derive(Debug)]
 655pub struct PlanEntry {
 656    pub content: Entity<Markdown>,
 657    pub priority: acp::PlanEntryPriority,
 658    pub status: acp::PlanEntryStatus,
 659}
 660
 661impl PlanEntry {
 662    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 663        Self {
 664            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 665            priority: entry.priority,
 666            status: entry.status,
 667        }
 668    }
 669}
 670
 671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 672pub struct TokenUsage {
 673    pub max_tokens: u64,
 674    pub used_tokens: u64,
 675}
 676
 677#[derive(Debug, Clone)]
 678pub struct RetryStatus {
 679    pub last_error: SharedString,
 680    pub attempt: usize,
 681    pub max_attempts: usize,
 682    pub started_at: Instant,
 683    pub duration: Duration,
 684}
 685
 686pub struct AcpThread {
 687    title: SharedString,
 688    entries: Vec<AgentThreadEntry>,
 689    plan: Plan,
 690    project: Entity<Project>,
 691    action_log: Entity<ActionLog>,
 692    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 693    send_task: Option<Task<()>>,
 694    connection: Rc<dyn AgentConnection>,
 695    session_id: acp::SessionId,
 696    token_usage: Option<TokenUsage>,
 697}
 698
 699#[derive(Debug)]
 700pub enum AcpThreadEvent {
 701    NewEntry,
 702    TitleUpdated,
 703    TokenUsageUpdated,
 704    EntryUpdated(usize),
 705    EntriesRemoved(Range<usize>),
 706    ToolAuthorizationRequired,
 707    Retry(RetryStatus),
 708    Stopped,
 709    Error,
 710    ServerExited(ExitStatus),
 711}
 712
 713impl EventEmitter<AcpThreadEvent> for AcpThread {}
 714
 715#[derive(PartialEq, Eq)]
 716pub enum ThreadStatus {
 717    Idle,
 718    WaitingForToolConfirmation,
 719    Generating,
 720}
 721
 722#[derive(Debug, Clone)]
 723pub enum LoadError {
 724    Unsupported {
 725        error_message: SharedString,
 726        upgrade_message: SharedString,
 727        upgrade_command: String,
 728    },
 729    Exited(i32),
 730    Other(SharedString),
 731}
 732
 733impl Display for LoadError {
 734    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 735        match self {
 736            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 737            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 738            LoadError::Other(msg) => write!(f, "{}", msg),
 739        }
 740    }
 741}
 742
 743impl Error for LoadError {}
 744
 745impl AcpThread {
 746    pub fn new(
 747        title: impl Into<SharedString>,
 748        connection: Rc<dyn AgentConnection>,
 749        project: Entity<Project>,
 750        action_log: Entity<ActionLog>,
 751        session_id: acp::SessionId,
 752    ) -> Self {
 753        Self {
 754            action_log,
 755            shared_buffers: Default::default(),
 756            entries: Default::default(),
 757            plan: Default::default(),
 758            title: title.into(),
 759            project,
 760            send_task: None,
 761            connection,
 762            session_id,
 763            token_usage: None,
 764        }
 765    }
 766
 767    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 768        &self.connection
 769    }
 770
 771    pub fn action_log(&self) -> &Entity<ActionLog> {
 772        &self.action_log
 773    }
 774
 775    pub fn project(&self) -> &Entity<Project> {
 776        &self.project
 777    }
 778
 779    pub fn title(&self) -> SharedString {
 780        self.title.clone()
 781    }
 782
 783    pub fn entries(&self) -> &[AgentThreadEntry] {
 784        &self.entries
 785    }
 786
 787    pub fn session_id(&self) -> &acp::SessionId {
 788        &self.session_id
 789    }
 790
 791    pub fn status(&self) -> ThreadStatus {
 792        if self.send_task.is_some() {
 793            if self.waiting_for_tool_confirmation() {
 794                ThreadStatus::WaitingForToolConfirmation
 795            } else {
 796                ThreadStatus::Generating
 797            }
 798        } else {
 799            ThreadStatus::Idle
 800        }
 801    }
 802
 803    pub fn token_usage(&self) -> Option<&TokenUsage> {
 804        self.token_usage.as_ref()
 805    }
 806
 807    pub fn has_pending_edit_tool_calls(&self) -> bool {
 808        for entry in self.entries.iter().rev() {
 809            match entry {
 810                AgentThreadEntry::UserMessage(_) => return false,
 811                AgentThreadEntry::ToolCall(
 812                    call @ ToolCall {
 813                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
 814                        ..
 815                    },
 816                ) if call.diffs().next().is_some() => {
 817                    return true;
 818                }
 819                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 820            }
 821        }
 822
 823        false
 824    }
 825
 826    pub fn used_tools_since_last_user_message(&self) -> bool {
 827        for entry in self.entries.iter().rev() {
 828            match entry {
 829                AgentThreadEntry::UserMessage(..) => return false,
 830                AgentThreadEntry::AssistantMessage(..) => continue,
 831                AgentThreadEntry::ToolCall(..) => return true,
 832            }
 833        }
 834
 835        false
 836    }
 837
 838    pub fn handle_session_update(
 839        &mut self,
 840        update: acp::SessionUpdate,
 841        cx: &mut Context<Self>,
 842    ) -> Result<(), acp::Error> {
 843        match update {
 844            acp::SessionUpdate::UserMessageChunk { content } => {
 845                self.push_user_content_block(None, content, cx);
 846            }
 847            acp::SessionUpdate::AgentMessageChunk { content } => {
 848                self.push_assistant_content_block(content, false, cx);
 849            }
 850            acp::SessionUpdate::AgentThoughtChunk { content } => {
 851                self.push_assistant_content_block(content, true, cx);
 852            }
 853            acp::SessionUpdate::ToolCall(tool_call) => {
 854                self.upsert_tool_call(tool_call, cx)?;
 855            }
 856            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 857                self.update_tool_call(tool_call_update, cx)?;
 858            }
 859            acp::SessionUpdate::Plan(plan) => {
 860                self.update_plan(plan, cx);
 861            }
 862        }
 863        Ok(())
 864    }
 865
 866    pub fn push_user_content_block(
 867        &mut self,
 868        message_id: Option<UserMessageId>,
 869        chunk: acp::ContentBlock,
 870        cx: &mut Context<Self>,
 871    ) {
 872        let language_registry = self.project.read(cx).languages().clone();
 873        let entries_len = self.entries.len();
 874
 875        if let Some(last_entry) = self.entries.last_mut()
 876            && let AgentThreadEntry::UserMessage(UserMessage {
 877                id,
 878                content,
 879                chunks,
 880                ..
 881            }) = last_entry
 882        {
 883            *id = message_id.or(id.take());
 884            content.append(chunk.clone(), &language_registry, cx);
 885            chunks.push(chunk);
 886            let idx = entries_len - 1;
 887            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 888        } else {
 889            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 890            self.push_entry(
 891                AgentThreadEntry::UserMessage(UserMessage {
 892                    id: message_id,
 893                    content,
 894                    chunks: vec![chunk],
 895                    checkpoint: None,
 896                }),
 897                cx,
 898            );
 899        }
 900    }
 901
 902    pub fn push_assistant_content_block(
 903        &mut self,
 904        chunk: acp::ContentBlock,
 905        is_thought: bool,
 906        cx: &mut Context<Self>,
 907    ) {
 908        let language_registry = self.project.read(cx).languages().clone();
 909        let entries_len = self.entries.len();
 910        if let Some(last_entry) = self.entries.last_mut()
 911            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 912        {
 913            let idx = entries_len - 1;
 914            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 915            match (chunks.last_mut(), is_thought) {
 916                (Some(AssistantMessageChunk::Message { block }), false)
 917                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 918                    block.append(chunk, &language_registry, cx)
 919                }
 920                _ => {
 921                    let block = ContentBlock::new(chunk, &language_registry, cx);
 922                    if is_thought {
 923                        chunks.push(AssistantMessageChunk::Thought { block })
 924                    } else {
 925                        chunks.push(AssistantMessageChunk::Message { block })
 926                    }
 927                }
 928            }
 929        } else {
 930            let block = ContentBlock::new(chunk, &language_registry, cx);
 931            let chunk = if is_thought {
 932                AssistantMessageChunk::Thought { block }
 933            } else {
 934                AssistantMessageChunk::Message { block }
 935            };
 936
 937            self.push_entry(
 938                AgentThreadEntry::AssistantMessage(AssistantMessage {
 939                    chunks: vec![chunk],
 940                }),
 941                cx,
 942            );
 943        }
 944    }
 945
 946    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 947        self.entries.push(entry);
 948        cx.emit(AcpThreadEvent::NewEntry);
 949    }
 950
 951    pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
 952        self.title = title;
 953        cx.emit(AcpThreadEvent::TitleUpdated);
 954        Ok(())
 955    }
 956
 957    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
 958        self.token_usage = usage;
 959        cx.emit(AcpThreadEvent::TokenUsageUpdated);
 960    }
 961
 962    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
 963        cx.emit(AcpThreadEvent::Retry(status));
 964    }
 965
 966    pub fn update_tool_call(
 967        &mut self,
 968        update: impl Into<ToolCallUpdate>,
 969        cx: &mut Context<Self>,
 970    ) -> Result<()> {
 971        let update = update.into();
 972        let languages = self.project.read(cx).languages().clone();
 973
 974        let (ix, current_call) = self
 975            .tool_call_mut(update.id())
 976            .context("Tool call not found")?;
 977        match update {
 978            ToolCallUpdate::UpdateFields(update) => {
 979                let location_updated = update.fields.locations.is_some();
 980                current_call.update_fields(update.fields, languages, cx);
 981                if location_updated {
 982                    self.resolve_locations(update.id.clone(), cx);
 983                }
 984            }
 985            ToolCallUpdate::UpdateDiff(update) => {
 986                current_call.content.clear();
 987                current_call
 988                    .content
 989                    .push(ToolCallContent::Diff(update.diff));
 990            }
 991            ToolCallUpdate::UpdateTerminal(update) => {
 992                current_call.content.clear();
 993                current_call
 994                    .content
 995                    .push(ToolCallContent::Terminal(update.terminal));
 996            }
 997        }
 998
 999        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1000
1001        Ok(())
1002    }
1003
1004    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1005    pub fn upsert_tool_call(
1006        &mut self,
1007        tool_call: acp::ToolCall,
1008        cx: &mut Context<Self>,
1009    ) -> Result<(), acp::Error> {
1010        let status = tool_call.status.into();
1011        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1012    }
1013
1014    /// Fails if id does not match an existing entry.
1015    pub fn upsert_tool_call_inner(
1016        &mut self,
1017        tool_call_update: acp::ToolCallUpdate,
1018        status: ToolCallStatus,
1019        cx: &mut Context<Self>,
1020    ) -> Result<(), acp::Error> {
1021        let language_registry = self.project.read(cx).languages().clone();
1022        let id = tool_call_update.id.clone();
1023
1024        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1025            current_call.update_fields(tool_call_update.fields, language_registry, cx);
1026            current_call.status = status;
1027
1028            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1029        } else {
1030            let call =
1031                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1032            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1033        };
1034
1035        self.resolve_locations(id, cx);
1036        Ok(())
1037    }
1038
1039    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1040        // The tool call we are looking for is typically the last one, or very close to the end.
1041        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1042        self.entries
1043            .iter_mut()
1044            .enumerate()
1045            .rev()
1046            .find_map(|(index, tool_call)| {
1047                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1048                    && &tool_call.id == id
1049                {
1050                    Some((index, tool_call))
1051                } else {
1052                    None
1053                }
1054            })
1055    }
1056
1057    pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1058        self.entries
1059            .iter()
1060            .enumerate()
1061            .rev()
1062            .find_map(|(index, tool_call)| {
1063                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1064                    && &tool_call.id == id
1065                {
1066                    Some((index, tool_call))
1067                } else {
1068                    None
1069                }
1070            })
1071    }
1072
1073    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1074        let project = self.project.clone();
1075        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1076            return;
1077        };
1078        let task = tool_call.resolve_locations(project, cx);
1079        cx.spawn(async move |this, cx| {
1080            let resolved_locations = task.await;
1081            this.update(cx, |this, cx| {
1082                let project = this.project.clone();
1083                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1084                    return;
1085                };
1086                if let Some(Some(location)) = resolved_locations.last() {
1087                    project.update(cx, |project, cx| {
1088                        if let Some(agent_location) = project.agent_location() {
1089                            let should_ignore = agent_location.buffer == location.buffer
1090                                && location
1091                                    .buffer
1092                                    .update(cx, |buffer, _| {
1093                                        let snapshot = buffer.snapshot();
1094                                        let old_position =
1095                                            agent_location.position.to_point(&snapshot);
1096                                        let new_position = location.position.to_point(&snapshot);
1097                                        // ignore this so that when we get updates from the edit tool
1098                                        // the position doesn't reset to the startof line
1099                                        old_position.row == new_position.row
1100                                            && old_position.column > new_position.column
1101                                    })
1102                                    .ok()
1103                                    .unwrap_or_default();
1104                            if !should_ignore {
1105                                project.set_agent_location(Some(location.clone()), cx);
1106                            }
1107                        }
1108                    });
1109                }
1110                if tool_call.resolved_locations != resolved_locations {
1111                    tool_call.resolved_locations = resolved_locations;
1112                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1113                }
1114            })
1115        })
1116        .detach();
1117    }
1118
1119    pub fn request_tool_call_authorization(
1120        &mut self,
1121        tool_call: acp::ToolCallUpdate,
1122        options: Vec<acp::PermissionOption>,
1123        cx: &mut Context<Self>,
1124    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1125        let (tx, rx) = oneshot::channel();
1126
1127        let status = ToolCallStatus::WaitingForConfirmation {
1128            options,
1129            respond_tx: tx,
1130        };
1131
1132        self.upsert_tool_call_inner(tool_call, status, cx)?;
1133        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1134        Ok(rx)
1135    }
1136
1137    pub fn authorize_tool_call(
1138        &mut self,
1139        id: acp::ToolCallId,
1140        option_id: acp::PermissionOptionId,
1141        option_kind: acp::PermissionOptionKind,
1142        cx: &mut Context<Self>,
1143    ) {
1144        let Some((ix, call)) = self.tool_call_mut(&id) else {
1145            return;
1146        };
1147
1148        let new_status = match option_kind {
1149            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1150                ToolCallStatus::Rejected
1151            }
1152            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1153                ToolCallStatus::InProgress
1154            }
1155        };
1156
1157        let curr_status = mem::replace(&mut call.status, new_status);
1158
1159        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1160            respond_tx.send(option_id).log_err();
1161        } else if cfg!(debug_assertions) {
1162            panic!("tried to authorize an already authorized tool call");
1163        }
1164
1165        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1166    }
1167
1168    /// Returns true if the last turn is awaiting tool authorization
1169    pub fn waiting_for_tool_confirmation(&self) -> bool {
1170        for entry in self.entries.iter().rev() {
1171            match &entry {
1172                AgentThreadEntry::ToolCall(call) => match call.status {
1173                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1174                    ToolCallStatus::Pending
1175                    | ToolCallStatus::InProgress
1176                    | ToolCallStatus::Completed
1177                    | ToolCallStatus::Failed
1178                    | ToolCallStatus::Rejected
1179                    | ToolCallStatus::Canceled => continue,
1180                },
1181                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1182                    // Reached the beginning of the turn
1183                    return false;
1184                }
1185            }
1186        }
1187        false
1188    }
1189
1190    pub fn plan(&self) -> &Plan {
1191        &self.plan
1192    }
1193
1194    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1195        let new_entries_len = request.entries.len();
1196        let mut new_entries = request.entries.into_iter();
1197
1198        // Reuse existing markdown to prevent flickering
1199        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1200            let PlanEntry {
1201                content,
1202                priority,
1203                status,
1204            } = old;
1205            content.update(cx, |old, cx| {
1206                old.replace(new.content, cx);
1207            });
1208            *priority = new.priority;
1209            *status = new.status;
1210        }
1211        for new in new_entries {
1212            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1213        }
1214        self.plan.entries.truncate(new_entries_len);
1215
1216        cx.notify();
1217    }
1218
1219    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1220        self.plan
1221            .entries
1222            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1223        cx.notify();
1224    }
1225
1226    #[cfg(any(test, feature = "test-support"))]
1227    pub fn send_raw(
1228        &mut self,
1229        message: &str,
1230        cx: &mut Context<Self>,
1231    ) -> BoxFuture<'static, Result<()>> {
1232        self.send(
1233            vec![acp::ContentBlock::Text(acp::TextContent {
1234                text: message.to_string(),
1235                annotations: None,
1236            })],
1237            cx,
1238        )
1239    }
1240
1241    pub fn send(
1242        &mut self,
1243        message: Vec<acp::ContentBlock>,
1244        cx: &mut Context<Self>,
1245    ) -> BoxFuture<'static, Result<()>> {
1246        let block = ContentBlock::new_combined(
1247            message.clone(),
1248            self.project.read(cx).languages().clone(),
1249            cx,
1250        );
1251        let request = acp::PromptRequest {
1252            prompt: message.clone(),
1253            session_id: self.session_id.clone(),
1254        };
1255        let git_store = self.project.read(cx).git_store().clone();
1256
1257        let message_id = if self
1258            .connection
1259            .session_editor(&self.session_id, cx)
1260            .is_some()
1261        {
1262            Some(UserMessageId::new())
1263        } else {
1264            None
1265        };
1266
1267        self.run_turn(cx, async move |this, cx| {
1268            this.update(cx, |this, cx| {
1269                this.push_entry(
1270                    AgentThreadEntry::UserMessage(UserMessage {
1271                        id: message_id.clone(),
1272                        content: block,
1273                        chunks: message,
1274                        checkpoint: None,
1275                    }),
1276                    cx,
1277                );
1278            })
1279            .ok();
1280
1281            let old_checkpoint = git_store
1282                .update(cx, |git, cx| git.checkpoint(cx))?
1283                .await
1284                .context("failed to get old checkpoint")
1285                .log_err();
1286            this.update(cx, |this, cx| {
1287                if let Some((_ix, message)) = this.last_user_message() {
1288                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1289                        git_checkpoint,
1290                        show: false,
1291                    });
1292                }
1293                this.connection.prompt(message_id, request, cx)
1294            })?
1295            .await
1296        })
1297    }
1298
1299    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1300        self.run_turn(cx, async move |this, cx| {
1301            this.update(cx, |this, cx| {
1302                this.connection
1303                    .resume(&this.session_id, cx)
1304                    .map(|resume| resume.run(cx))
1305            })?
1306            .context("resuming a session is not supported")?
1307            .await
1308        })
1309    }
1310
1311    fn run_turn(
1312        &mut self,
1313        cx: &mut Context<Self>,
1314        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1315    ) -> BoxFuture<'static, Result<()>> {
1316        self.clear_completed_plan_entries(cx);
1317
1318        let (tx, rx) = oneshot::channel();
1319        let cancel_task = self.cancel(cx);
1320
1321        self.send_task = Some(cx.spawn(async move |this, cx| {
1322            cancel_task.await;
1323            tx.send(f(this, cx).await).ok();
1324        }));
1325
1326        cx.spawn(async move |this, cx| {
1327            let response = rx.await;
1328
1329            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1330                .await?;
1331
1332            this.update(cx, |this, cx| {
1333                this.project
1334                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1335                match response {
1336                    Ok(Err(e)) => {
1337                        this.send_task.take();
1338                        cx.emit(AcpThreadEvent::Error);
1339                        Err(e)
1340                    }
1341                    result => {
1342                        let canceled = matches!(
1343                            result,
1344                            Ok(Ok(acp::PromptResponse {
1345                                stop_reason: acp::StopReason::Canceled
1346                            }))
1347                        );
1348
1349                        // We only take the task if the current prompt wasn't canceled.
1350                        //
1351                        // This prompt may have been canceled because another one was sent
1352                        // while it was still generating. In these cases, dropping `send_task`
1353                        // would cause the next generation to be canceled.
1354                        if !canceled {
1355                            this.send_task.take();
1356                        }
1357
1358                        cx.emit(AcpThreadEvent::Stopped);
1359                        Ok(())
1360                    }
1361                }
1362            })?
1363        })
1364        .boxed()
1365    }
1366
1367    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1368        let Some(send_task) = self.send_task.take() else {
1369            return Task::ready(());
1370        };
1371
1372        for entry in self.entries.iter_mut() {
1373            if let AgentThreadEntry::ToolCall(call) = entry {
1374                let cancel = matches!(
1375                    call.status,
1376                    ToolCallStatus::Pending
1377                        | ToolCallStatus::WaitingForConfirmation { .. }
1378                        | ToolCallStatus::InProgress
1379                );
1380
1381                if cancel {
1382                    call.status = ToolCallStatus::Canceled;
1383                }
1384            }
1385        }
1386
1387        self.connection.cancel(&self.session_id, cx);
1388
1389        // Wait for the send task to complete
1390        cx.foreground_executor().spawn(send_task)
1391    }
1392
1393    /// Rewinds this thread to before the entry at `index`, removing it and all
1394    /// subsequent entries while reverting any changes made from that point.
1395    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1396        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1397            return Task::ready(Err(anyhow!("not supported")));
1398        };
1399        let Some(message) = self.user_message(&id) else {
1400            return Task::ready(Err(anyhow!("message not found")));
1401        };
1402
1403        let checkpoint = message
1404            .checkpoint
1405            .as_ref()
1406            .map(|c| c.git_checkpoint.clone());
1407
1408        let git_store = self.project.read(cx).git_store().clone();
1409        cx.spawn(async move |this, cx| {
1410            if let Some(checkpoint) = checkpoint {
1411                git_store
1412                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1413                    .await?;
1414            }
1415
1416            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1417                .await?;
1418            this.update(cx, |this, cx| {
1419                if let Some((ix, _)) = this.user_message_mut(&id) {
1420                    let range = ix..this.entries.len();
1421                    this.entries.truncate(ix);
1422                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1423                }
1424            })
1425        })
1426    }
1427
1428    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1429        let git_store = self.project.read(cx).git_store().clone();
1430
1431        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1432            if let Some(checkpoint) = message.checkpoint.as_ref() {
1433                checkpoint.git_checkpoint.clone()
1434            } else {
1435                return Task::ready(Ok(()));
1436            }
1437        } else {
1438            return Task::ready(Ok(()));
1439        };
1440
1441        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1442        cx.spawn(async move |this, cx| {
1443            let new_checkpoint = new_checkpoint
1444                .await
1445                .context("failed to get new checkpoint")
1446                .log_err();
1447            if let Some(new_checkpoint) = new_checkpoint {
1448                let equal = git_store
1449                    .update(cx, |git, cx| {
1450                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1451                    })?
1452                    .await
1453                    .unwrap_or(true);
1454                this.update(cx, |this, cx| {
1455                    let (ix, message) = this.last_user_message().context("no user message")?;
1456                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1457                    checkpoint.show = !equal;
1458                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1459                    anyhow::Ok(())
1460                })??;
1461            }
1462
1463            Ok(())
1464        })
1465    }
1466
1467    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1468        self.entries
1469            .iter_mut()
1470            .enumerate()
1471            .rev()
1472            .find_map(|(ix, entry)| {
1473                if let AgentThreadEntry::UserMessage(message) = entry {
1474                    Some((ix, message))
1475                } else {
1476                    None
1477                }
1478            })
1479    }
1480
1481    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1482        self.entries.iter().find_map(|entry| {
1483            if let AgentThreadEntry::UserMessage(message) = entry {
1484                if message.id.as_ref() == Some(id) {
1485                    Some(message)
1486                } else {
1487                    None
1488                }
1489            } else {
1490                None
1491            }
1492        })
1493    }
1494
1495    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1496        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1497            if let AgentThreadEntry::UserMessage(message) = entry {
1498                if message.id.as_ref() == Some(id) {
1499                    Some((ix, message))
1500                } else {
1501                    None
1502                }
1503            } else {
1504                None
1505            }
1506        })
1507    }
1508
1509    pub fn read_text_file(
1510        &self,
1511        path: PathBuf,
1512        line: Option<u32>,
1513        limit: Option<u32>,
1514        reuse_shared_snapshot: bool,
1515        cx: &mut Context<Self>,
1516    ) -> Task<Result<String>> {
1517        let project = self.project.clone();
1518        let action_log = self.action_log.clone();
1519        cx.spawn(async move |this, cx| {
1520            let load = project.update(cx, |project, cx| {
1521                let path = project
1522                    .project_path_for_absolute_path(&path, cx)
1523                    .context("invalid path")?;
1524                anyhow::Ok(project.open_buffer(path, cx))
1525            });
1526            let buffer = load??.await?;
1527
1528            let snapshot = if reuse_shared_snapshot {
1529                this.read_with(cx, |this, _| {
1530                    this.shared_buffers.get(&buffer.clone()).cloned()
1531                })
1532                .log_err()
1533                .flatten()
1534            } else {
1535                None
1536            };
1537
1538            let snapshot = if let Some(snapshot) = snapshot {
1539                snapshot
1540            } else {
1541                action_log.update(cx, |action_log, cx| {
1542                    action_log.buffer_read(buffer.clone(), cx);
1543                })?;
1544                project.update(cx, |project, cx| {
1545                    let position = buffer
1546                        .read(cx)
1547                        .snapshot()
1548                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1549                    project.set_agent_location(
1550                        Some(AgentLocation {
1551                            buffer: buffer.downgrade(),
1552                            position,
1553                        }),
1554                        cx,
1555                    );
1556                })?;
1557
1558                buffer.update(cx, |buffer, _| buffer.snapshot())?
1559            };
1560
1561            this.update(cx, |this, _| {
1562                let text = snapshot.text();
1563                this.shared_buffers.insert(buffer.clone(), snapshot);
1564                if line.is_none() && limit.is_none() {
1565                    return Ok(text);
1566                }
1567                let limit = limit.unwrap_or(u32::MAX) as usize;
1568                let Some(line) = line else {
1569                    return Ok(text.lines().take(limit).collect::<String>());
1570                };
1571
1572                let count = text.lines().count();
1573                if count < line as usize {
1574                    anyhow::bail!("There are only {} lines", count);
1575                }
1576                Ok(text
1577                    .lines()
1578                    .skip(line as usize + 1)
1579                    .take(limit)
1580                    .collect::<String>())
1581            })?
1582        })
1583    }
1584
1585    pub fn write_text_file(
1586        &self,
1587        path: PathBuf,
1588        content: String,
1589        cx: &mut Context<Self>,
1590    ) -> Task<Result<()>> {
1591        let project = self.project.clone();
1592        let action_log = self.action_log.clone();
1593        cx.spawn(async move |this, cx| {
1594            let load = project.update(cx, |project, cx| {
1595                let path = project
1596                    .project_path_for_absolute_path(&path, cx)
1597                    .context("invalid path")?;
1598                anyhow::Ok(project.open_buffer(path, cx))
1599            });
1600            let buffer = load??.await?;
1601            let snapshot = this.update(cx, |this, cx| {
1602                this.shared_buffers
1603                    .get(&buffer)
1604                    .cloned()
1605                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1606            })?;
1607            let edits = cx
1608                .background_executor()
1609                .spawn(async move {
1610                    let old_text = snapshot.text();
1611                    text_diff(old_text.as_str(), &content)
1612                        .into_iter()
1613                        .map(|(range, replacement)| {
1614                            (
1615                                snapshot.anchor_after(range.start)
1616                                    ..snapshot.anchor_before(range.end),
1617                                replacement,
1618                            )
1619                        })
1620                        .collect::<Vec<_>>()
1621                })
1622                .await;
1623
1624            project.update(cx, |project, cx| {
1625                project.set_agent_location(
1626                    Some(AgentLocation {
1627                        buffer: buffer.downgrade(),
1628                        position: edits
1629                            .last()
1630                            .map(|(range, _)| range.end)
1631                            .unwrap_or(Anchor::MIN),
1632                    }),
1633                    cx,
1634                );
1635            })?;
1636
1637            let format_on_save = cx.update(|cx| {
1638                action_log.update(cx, |action_log, cx| {
1639                    action_log.buffer_read(buffer.clone(), cx);
1640                });
1641
1642                let format_on_save = buffer.update(cx, |buffer, cx| {
1643                    buffer.edit(edits, None, cx);
1644
1645                    let settings = language::language_settings::language_settings(
1646                        buffer.language().map(|l| l.name()),
1647                        buffer.file(),
1648                        cx,
1649                    );
1650
1651                    settings.format_on_save != FormatOnSave::Off
1652                });
1653                action_log.update(cx, |action_log, cx| {
1654                    action_log.buffer_edited(buffer.clone(), cx);
1655                });
1656                format_on_save
1657            })?;
1658
1659            if format_on_save {
1660                let format_task = project.update(cx, |project, cx| {
1661                    project.format(
1662                        HashSet::from_iter([buffer.clone()]),
1663                        LspFormatTarget::Buffers,
1664                        false,
1665                        FormatTrigger::Save,
1666                        cx,
1667                    )
1668                })?;
1669                format_task.await.log_err();
1670
1671                action_log.update(cx, |action_log, cx| {
1672                    action_log.buffer_edited(buffer.clone(), cx);
1673                })?;
1674            }
1675
1676            project
1677                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1678                .await
1679        })
1680    }
1681
1682    pub fn to_markdown(&self, cx: &App) -> String {
1683        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1684    }
1685
1686    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1687        cx.emit(AcpThreadEvent::ServerExited(status));
1688    }
1689}
1690
1691fn markdown_for_raw_output(
1692    raw_output: &serde_json::Value,
1693    language_registry: &Arc<LanguageRegistry>,
1694    cx: &mut App,
1695) -> Option<Entity<Markdown>> {
1696    match raw_output {
1697        serde_json::Value::Null => None,
1698        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1699            Markdown::new(
1700                value.to_string().into(),
1701                Some(language_registry.clone()),
1702                None,
1703                cx,
1704            )
1705        })),
1706        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1707            Markdown::new(
1708                value.to_string().into(),
1709                Some(language_registry.clone()),
1710                None,
1711                cx,
1712            )
1713        })),
1714        serde_json::Value::String(value) => Some(cx.new(|cx| {
1715            Markdown::new(
1716                value.clone().into(),
1717                Some(language_registry.clone()),
1718                None,
1719                cx,
1720            )
1721        })),
1722        value => Some(cx.new(|cx| {
1723            Markdown::new(
1724                format!("```json\n{}\n```", value).into(),
1725                Some(language_registry.clone()),
1726                None,
1727                cx,
1728            )
1729        })),
1730    }
1731}
1732
1733#[cfg(test)]
1734mod tests {
1735    use super::*;
1736    use anyhow::anyhow;
1737    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1738    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1739    use indoc::indoc;
1740    use project::{FakeFs, Fs};
1741    use rand::Rng as _;
1742    use serde_json::json;
1743    use settings::SettingsStore;
1744    use smol::stream::StreamExt as _;
1745    use std::{
1746        any::Any,
1747        cell::RefCell,
1748        path::Path,
1749        rc::Rc,
1750        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1751        time::Duration,
1752    };
1753    use util::path;
1754
1755    fn init_test(cx: &mut TestAppContext) {
1756        env_logger::try_init().ok();
1757        cx.update(|cx| {
1758            let settings_store = SettingsStore::test(cx);
1759            cx.set_global(settings_store);
1760            Project::init_settings(cx);
1761            language::init(cx);
1762        });
1763    }
1764
1765    #[gpui::test]
1766    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1767        init_test(cx);
1768
1769        let fs = FakeFs::new(cx.executor());
1770        let project = Project::test(fs, [], cx).await;
1771        let connection = Rc::new(FakeAgentConnection::new());
1772        let thread = cx
1773            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1774            .await
1775            .unwrap();
1776
1777        // Test creating a new user message
1778        thread.update(cx, |thread, cx| {
1779            thread.push_user_content_block(
1780                None,
1781                acp::ContentBlock::Text(acp::TextContent {
1782                    annotations: None,
1783                    text: "Hello, ".to_string(),
1784                }),
1785                cx,
1786            );
1787        });
1788
1789        thread.update(cx, |thread, cx| {
1790            assert_eq!(thread.entries.len(), 1);
1791            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1792                assert_eq!(user_msg.id, None);
1793                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1794            } else {
1795                panic!("Expected UserMessage");
1796            }
1797        });
1798
1799        // Test appending to existing user message
1800        let message_1_id = UserMessageId::new();
1801        thread.update(cx, |thread, cx| {
1802            thread.push_user_content_block(
1803                Some(message_1_id.clone()),
1804                acp::ContentBlock::Text(acp::TextContent {
1805                    annotations: None,
1806                    text: "world!".to_string(),
1807                }),
1808                cx,
1809            );
1810        });
1811
1812        thread.update(cx, |thread, cx| {
1813            assert_eq!(thread.entries.len(), 1);
1814            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1815                assert_eq!(user_msg.id, Some(message_1_id));
1816                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1817            } else {
1818                panic!("Expected UserMessage");
1819            }
1820        });
1821
1822        // Test creating new user message after assistant message
1823        thread.update(cx, |thread, cx| {
1824            thread.push_assistant_content_block(
1825                acp::ContentBlock::Text(acp::TextContent {
1826                    annotations: None,
1827                    text: "Assistant response".to_string(),
1828                }),
1829                false,
1830                cx,
1831            );
1832        });
1833
1834        let message_2_id = UserMessageId::new();
1835        thread.update(cx, |thread, cx| {
1836            thread.push_user_content_block(
1837                Some(message_2_id.clone()),
1838                acp::ContentBlock::Text(acp::TextContent {
1839                    annotations: None,
1840                    text: "New user message".to_string(),
1841                }),
1842                cx,
1843            );
1844        });
1845
1846        thread.update(cx, |thread, cx| {
1847            assert_eq!(thread.entries.len(), 3);
1848            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1849                assert_eq!(user_msg.id, Some(message_2_id));
1850                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1851            } else {
1852                panic!("Expected UserMessage at index 2");
1853            }
1854        });
1855    }
1856
1857    #[gpui::test]
1858    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1859        init_test(cx);
1860
1861        let fs = FakeFs::new(cx.executor());
1862        let project = Project::test(fs, [], cx).await;
1863        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1864            |_, thread, mut cx| {
1865                async move {
1866                    thread.update(&mut cx, |thread, cx| {
1867                        thread
1868                            .handle_session_update(
1869                                acp::SessionUpdate::AgentThoughtChunk {
1870                                    content: "Thinking ".into(),
1871                                },
1872                                cx,
1873                            )
1874                            .unwrap();
1875                        thread
1876                            .handle_session_update(
1877                                acp::SessionUpdate::AgentThoughtChunk {
1878                                    content: "hard!".into(),
1879                                },
1880                                cx,
1881                            )
1882                            .unwrap();
1883                    })?;
1884                    Ok(acp::PromptResponse {
1885                        stop_reason: acp::StopReason::EndTurn,
1886                    })
1887                }
1888                .boxed_local()
1889            },
1890        ));
1891
1892        let thread = cx
1893            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1894            .await
1895            .unwrap();
1896
1897        thread
1898            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1899            .await
1900            .unwrap();
1901
1902        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1903        assert_eq!(
1904            output,
1905            indoc! {r#"
1906            ## User
1907
1908            Hello from Zed!
1909
1910            ## Assistant
1911
1912            <thinking>
1913            Thinking hard!
1914            </thinking>
1915
1916            "#}
1917        );
1918    }
1919
1920    #[gpui::test]
1921    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1922        init_test(cx);
1923
1924        let fs = FakeFs::new(cx.executor());
1925        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1926            .await;
1927        let project = Project::test(fs.clone(), [], cx).await;
1928        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1929        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1930        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1931            move |_, thread, mut cx| {
1932                let read_file_tx = read_file_tx.clone();
1933                async move {
1934                    let content = thread
1935                        .update(&mut cx, |thread, cx| {
1936                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1937                        })
1938                        .unwrap()
1939                        .await
1940                        .unwrap();
1941                    assert_eq!(content, "one\ntwo\nthree\n");
1942                    read_file_tx.take().unwrap().send(()).unwrap();
1943                    thread
1944                        .update(&mut cx, |thread, cx| {
1945                            thread.write_text_file(
1946                                path!("/tmp/foo").into(),
1947                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1948                                cx,
1949                            )
1950                        })
1951                        .unwrap()
1952                        .await
1953                        .unwrap();
1954                    Ok(acp::PromptResponse {
1955                        stop_reason: acp::StopReason::EndTurn,
1956                    })
1957                }
1958                .boxed_local()
1959            },
1960        ));
1961
1962        let (worktree, pathbuf) = project
1963            .update(cx, |project, cx| {
1964                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1965            })
1966            .await
1967            .unwrap();
1968        let buffer = project
1969            .update(cx, |project, cx| {
1970                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1971            })
1972            .await
1973            .unwrap();
1974
1975        let thread = cx
1976            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1977            .await
1978            .unwrap();
1979
1980        let request = thread.update(cx, |thread, cx| {
1981            thread.send_raw("Extend the count in /tmp/foo", cx)
1982        });
1983        read_file_rx.await.ok();
1984        buffer.update(cx, |buffer, cx| {
1985            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1986        });
1987        cx.run_until_parked();
1988        assert_eq!(
1989            buffer.read_with(cx, |buffer, _| buffer.text()),
1990            "zero\none\ntwo\nthree\nfour\nfive\n"
1991        );
1992        assert_eq!(
1993            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1994            "zero\none\ntwo\nthree\nfour\nfive\n"
1995        );
1996        request.await.unwrap();
1997    }
1998
1999    #[gpui::test]
2000    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2001        init_test(cx);
2002
2003        let fs = FakeFs::new(cx.executor());
2004        let project = Project::test(fs, [], cx).await;
2005        let id = acp::ToolCallId("test".into());
2006
2007        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2008            let id = id.clone();
2009            move |_, thread, mut cx| {
2010                let id = id.clone();
2011                async move {
2012                    thread
2013                        .update(&mut cx, |thread, cx| {
2014                            thread.handle_session_update(
2015                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2016                                    id: id.clone(),
2017                                    title: "Label".into(),
2018                                    kind: acp::ToolKind::Fetch,
2019                                    status: acp::ToolCallStatus::InProgress,
2020                                    content: vec![],
2021                                    locations: vec![],
2022                                    raw_input: None,
2023                                    raw_output: None,
2024                                }),
2025                                cx,
2026                            )
2027                        })
2028                        .unwrap()
2029                        .unwrap();
2030                    Ok(acp::PromptResponse {
2031                        stop_reason: acp::StopReason::EndTurn,
2032                    })
2033                }
2034                .boxed_local()
2035            }
2036        }));
2037
2038        let thread = cx
2039            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2040            .await
2041            .unwrap();
2042
2043        let request = thread.update(cx, |thread, cx| {
2044            thread.send_raw("Fetch https://example.com", cx)
2045        });
2046
2047        run_until_first_tool_call(&thread, cx).await;
2048
2049        thread.read_with(cx, |thread, _| {
2050            assert!(matches!(
2051                thread.entries[1],
2052                AgentThreadEntry::ToolCall(ToolCall {
2053                    status: ToolCallStatus::InProgress,
2054                    ..
2055                })
2056            ));
2057        });
2058
2059        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2060
2061        thread.read_with(cx, |thread, _| {
2062            assert!(matches!(
2063                &thread.entries[1],
2064                AgentThreadEntry::ToolCall(ToolCall {
2065                    status: ToolCallStatus::Canceled,
2066                    ..
2067                })
2068            ));
2069        });
2070
2071        thread
2072            .update(cx, |thread, cx| {
2073                thread.handle_session_update(
2074                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2075                        id,
2076                        fields: acp::ToolCallUpdateFields {
2077                            status: Some(acp::ToolCallStatus::Completed),
2078                            ..Default::default()
2079                        },
2080                    }),
2081                    cx,
2082                )
2083            })
2084            .unwrap();
2085
2086        request.await.unwrap();
2087
2088        thread.read_with(cx, |thread, _| {
2089            assert!(matches!(
2090                thread.entries[1],
2091                AgentThreadEntry::ToolCall(ToolCall {
2092                    status: ToolCallStatus::Completed,
2093                    ..
2094                })
2095            ));
2096        });
2097    }
2098
2099    #[gpui::test]
2100    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2101        init_test(cx);
2102        let fs = FakeFs::new(cx.background_executor.clone());
2103        fs.insert_tree(path!("/test"), json!({})).await;
2104        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2105
2106        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2107            move |_, thread, mut cx| {
2108                async move {
2109                    thread
2110                        .update(&mut cx, |thread, cx| {
2111                            thread.handle_session_update(
2112                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2113                                    id: acp::ToolCallId("test".into()),
2114                                    title: "Label".into(),
2115                                    kind: acp::ToolKind::Edit,
2116                                    status: acp::ToolCallStatus::Completed,
2117                                    content: vec![acp::ToolCallContent::Diff {
2118                                        diff: acp::Diff {
2119                                            path: "/test/test.txt".into(),
2120                                            old_text: None,
2121                                            new_text: "foo".into(),
2122                                        },
2123                                    }],
2124                                    locations: vec![],
2125                                    raw_input: None,
2126                                    raw_output: None,
2127                                }),
2128                                cx,
2129                            )
2130                        })
2131                        .unwrap()
2132                        .unwrap();
2133                    Ok(acp::PromptResponse {
2134                        stop_reason: acp::StopReason::EndTurn,
2135                    })
2136                }
2137                .boxed_local()
2138            }
2139        }));
2140
2141        let thread = cx
2142            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2143            .await
2144            .unwrap();
2145
2146        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2147            .await
2148            .unwrap();
2149
2150        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2151    }
2152
2153    #[gpui::test(iterations = 10)]
2154    async fn test_checkpoints(cx: &mut TestAppContext) {
2155        init_test(cx);
2156        let fs = FakeFs::new(cx.background_executor.clone());
2157        fs.insert_tree(
2158            path!("/test"),
2159            json!({
2160                ".git": {}
2161            }),
2162        )
2163        .await;
2164        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2165
2166        let simulate_changes = Arc::new(AtomicBool::new(true));
2167        let next_filename = Arc::new(AtomicUsize::new(0));
2168        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2169            let simulate_changes = simulate_changes.clone();
2170            let next_filename = next_filename.clone();
2171            let fs = fs.clone();
2172            move |request, thread, mut cx| {
2173                let fs = fs.clone();
2174                let simulate_changes = simulate_changes.clone();
2175                let next_filename = next_filename.clone();
2176                async move {
2177                    if simulate_changes.load(SeqCst) {
2178                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2179                        fs.write(Path::new(&filename), b"").await?;
2180                    }
2181
2182                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2183                        panic!("expected text content block");
2184                    };
2185                    thread.update(&mut cx, |thread, cx| {
2186                        thread
2187                            .handle_session_update(
2188                                acp::SessionUpdate::AgentMessageChunk {
2189                                    content: content.text.to_uppercase().into(),
2190                                },
2191                                cx,
2192                            )
2193                            .unwrap();
2194                    })?;
2195                    Ok(acp::PromptResponse {
2196                        stop_reason: acp::StopReason::EndTurn,
2197                    })
2198                }
2199                .boxed_local()
2200            }
2201        }));
2202        let thread = cx
2203            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2204            .await
2205            .unwrap();
2206
2207        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2208            .await
2209            .unwrap();
2210        thread.read_with(cx, |thread, cx| {
2211            assert_eq!(
2212                thread.to_markdown(cx),
2213                indoc! {"
2214                    ## User (checkpoint)
2215
2216                    Lorem
2217
2218                    ## Assistant
2219
2220                    LOREM
2221
2222                "}
2223            );
2224        });
2225        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2226
2227        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2228            .await
2229            .unwrap();
2230        thread.read_with(cx, |thread, cx| {
2231            assert_eq!(
2232                thread.to_markdown(cx),
2233                indoc! {"
2234                    ## User (checkpoint)
2235
2236                    Lorem
2237
2238                    ## Assistant
2239
2240                    LOREM
2241
2242                    ## User (checkpoint)
2243
2244                    ipsum
2245
2246                    ## Assistant
2247
2248                    IPSUM
2249
2250                "}
2251            );
2252        });
2253        assert_eq!(
2254            fs.files(),
2255            vec![
2256                Path::new(path!("/test/file-0")),
2257                Path::new(path!("/test/file-1"))
2258            ]
2259        );
2260
2261        // Checkpoint isn't stored when there are no changes.
2262        simulate_changes.store(false, SeqCst);
2263        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2264            .await
2265            .unwrap();
2266        thread.read_with(cx, |thread, cx| {
2267            assert_eq!(
2268                thread.to_markdown(cx),
2269                indoc! {"
2270                    ## User (checkpoint)
2271
2272                    Lorem
2273
2274                    ## Assistant
2275
2276                    LOREM
2277
2278                    ## User (checkpoint)
2279
2280                    ipsum
2281
2282                    ## Assistant
2283
2284                    IPSUM
2285
2286                    ## User
2287
2288                    dolor
2289
2290                    ## Assistant
2291
2292                    DOLOR
2293
2294                "}
2295            );
2296        });
2297        assert_eq!(
2298            fs.files(),
2299            vec![
2300                Path::new(path!("/test/file-0")),
2301                Path::new(path!("/test/file-1"))
2302            ]
2303        );
2304
2305        // Rewinding the conversation truncates the history and restores the checkpoint.
2306        thread
2307            .update(cx, |thread, cx| {
2308                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2309                    panic!("unexpected entries {:?}", thread.entries)
2310                };
2311                thread.rewind(message.id.clone().unwrap(), cx)
2312            })
2313            .await
2314            .unwrap();
2315        thread.read_with(cx, |thread, cx| {
2316            assert_eq!(
2317                thread.to_markdown(cx),
2318                indoc! {"
2319                    ## User (checkpoint)
2320
2321                    Lorem
2322
2323                    ## Assistant
2324
2325                    LOREM
2326
2327                "}
2328            );
2329        });
2330        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2331    }
2332
2333    async fn run_until_first_tool_call(
2334        thread: &Entity<AcpThread>,
2335        cx: &mut TestAppContext,
2336    ) -> usize {
2337        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2338
2339        let subscription = cx.update(|cx| {
2340            cx.subscribe(thread, move |thread, _, cx| {
2341                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2342                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2343                        return tx.try_send(ix).unwrap();
2344                    }
2345                }
2346            })
2347        });
2348
2349        select! {
2350            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2351                panic!("Timeout waiting for tool call")
2352            }
2353            ix = rx.next().fuse() => {
2354                drop(subscription);
2355                ix.unwrap()
2356            }
2357        }
2358    }
2359
2360    #[derive(Clone, Default)]
2361    struct FakeAgentConnection {
2362        auth_methods: Vec<acp::AuthMethod>,
2363        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2364        on_user_message: Option<
2365            Rc<
2366                dyn Fn(
2367                        acp::PromptRequest,
2368                        WeakEntity<AcpThread>,
2369                        AsyncApp,
2370                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2371                    + 'static,
2372            >,
2373        >,
2374    }
2375
2376    impl FakeAgentConnection {
2377        fn new() -> Self {
2378            Self {
2379                auth_methods: Vec::new(),
2380                on_user_message: None,
2381                sessions: Arc::default(),
2382            }
2383        }
2384
2385        #[expect(unused)]
2386        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2387            self.auth_methods = auth_methods;
2388            self
2389        }
2390
2391        fn on_user_message(
2392            mut self,
2393            handler: impl Fn(
2394                acp::PromptRequest,
2395                WeakEntity<AcpThread>,
2396                AsyncApp,
2397            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2398            + 'static,
2399        ) -> Self {
2400            self.on_user_message.replace(Rc::new(handler));
2401            self
2402        }
2403    }
2404
2405    impl AgentConnection for FakeAgentConnection {
2406        fn auth_methods(&self) -> &[acp::AuthMethod] {
2407            &self.auth_methods
2408        }
2409
2410        fn new_thread(
2411            self: Rc<Self>,
2412            project: Entity<Project>,
2413            _cwd: &Path,
2414            cx: &mut App,
2415        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2416            let session_id = acp::SessionId(
2417                rand::thread_rng()
2418                    .sample_iter(&rand::distributions::Alphanumeric)
2419                    .take(7)
2420                    .map(char::from)
2421                    .collect::<String>()
2422                    .into(),
2423            );
2424            let action_log = cx.new(|_| ActionLog::new(project.clone()));
2425            let thread = cx.new(|_cx| {
2426                AcpThread::new(
2427                    "Test",
2428                    self.clone(),
2429                    project,
2430                    action_log,
2431                    session_id.clone(),
2432                )
2433            });
2434            self.sessions.lock().insert(session_id, thread.downgrade());
2435            Task::ready(Ok(thread))
2436        }
2437
2438        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2439            if self.auth_methods().iter().any(|m| m.id == method) {
2440                Task::ready(Ok(()))
2441            } else {
2442                Task::ready(Err(anyhow!("Invalid Auth Method")))
2443            }
2444        }
2445
2446        fn prompt(
2447            &self,
2448            _id: Option<UserMessageId>,
2449            params: acp::PromptRequest,
2450            cx: &mut App,
2451        ) -> Task<gpui::Result<acp::PromptResponse>> {
2452            let sessions = self.sessions.lock();
2453            let thread = sessions.get(&params.session_id).unwrap();
2454            if let Some(handler) = &self.on_user_message {
2455                let handler = handler.clone();
2456                let thread = thread.clone();
2457                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2458            } else {
2459                Task::ready(Ok(acp::PromptResponse {
2460                    stop_reason: acp::StopReason::EndTurn,
2461                }))
2462            }
2463        }
2464
2465        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2466            let sessions = self.sessions.lock();
2467            let thread = sessions.get(session_id).unwrap().clone();
2468
2469            cx.spawn(async move |cx| {
2470                thread
2471                    .update(cx, |thread, cx| thread.cancel(cx))
2472                    .unwrap()
2473                    .await
2474            })
2475            .detach();
2476        }
2477
2478        fn session_editor(
2479            &self,
2480            session_id: &acp::SessionId,
2481            _cx: &mut App,
2482        ) -> Option<Rc<dyn AgentSessionEditor>> {
2483            Some(Rc::new(FakeAgentSessionEditor {
2484                _session_id: session_id.clone(),
2485            }))
2486        }
2487
2488        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2489            self
2490        }
2491    }
2492
2493    struct FakeAgentSessionEditor {
2494        _session_id: acp::SessionId,
2495    }
2496
2497    impl AgentSessionEditor for FakeAgentSessionEditor {
2498        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2499            Task::ready(Ok(()))
2500        }
2501    }
2502}