acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9use serde::{Deserialize, Serialize};
  10pub use terminal::*;
  11
  12use action_log::ActionLog;
  13use agent_client_protocol as acp;
  14use anyhow::{Context as _, Result, anyhow};
  15use chrono::{DateTime, Utc};
  16use editor::Bias;
  17use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  18use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  19use itertools::Itertools;
  20use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  21use markdown::Markdown;
  22use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  23use std::collections::HashMap;
  24use std::error::Error;
  25use std::fmt::{Formatter, Write};
  26use std::ops::Range;
  27use std::process::ExitStatus;
  28use std::rc::Rc;
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use ui::App;
  31use util::ResultExt;
  32
  33#[derive(Debug)]
  34pub struct UserMessage {
  35    pub id: Option<UserMessageId>,
  36    pub content: ContentBlock,
  37    pub chunks: Vec<acp::ContentBlock>,
  38    pub checkpoint: Option<Checkpoint>,
  39}
  40
  41#[derive(Debug)]
  42pub struct Checkpoint {
  43    git_checkpoint: GitStoreCheckpoint,
  44    pub show: bool,
  45}
  46
  47impl UserMessage {
  48    fn to_markdown(&self, cx: &App) -> String {
  49        let mut markdown = String::new();
  50        if self
  51            .checkpoint
  52            .as_ref()
  53            .map_or(false, |checkpoint| checkpoint.show)
  54        {
  55            writeln!(markdown, "## User (checkpoint)").unwrap();
  56        } else {
  57            writeln!(markdown, "## User").unwrap();
  58        }
  59        writeln!(markdown).unwrap();
  60        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  61        writeln!(markdown).unwrap();
  62        markdown
  63    }
  64}
  65
  66#[derive(Debug, PartialEq)]
  67pub struct AssistantMessage {
  68    pub chunks: Vec<AssistantMessageChunk>,
  69}
  70
  71impl AssistantMessage {
  72    pub fn to_markdown(&self, cx: &App) -> String {
  73        format!(
  74            "## Assistant\n\n{}\n\n",
  75            self.chunks
  76                .iter()
  77                .map(|chunk| chunk.to_markdown(cx))
  78                .join("\n\n")
  79        )
  80    }
  81}
  82
  83#[derive(Debug, PartialEq)]
  84pub enum AssistantMessageChunk {
  85    Message { block: ContentBlock },
  86    Thought { block: ContentBlock },
  87}
  88
  89impl AssistantMessageChunk {
  90    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  91        Self::Message {
  92            block: ContentBlock::new(chunk.into(), language_registry, cx),
  93        }
  94    }
  95
  96    fn to_markdown(&self, cx: &App) -> String {
  97        match self {
  98            Self::Message { block } => block.to_markdown(cx).to_string(),
  99            Self::Thought { block } => {
 100                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 101            }
 102        }
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub enum AgentThreadEntry {
 108    UserMessage(UserMessage),
 109    AssistantMessage(AssistantMessage),
 110    ToolCall(ToolCall),
 111}
 112
 113impl AgentThreadEntry {
 114    pub fn to_markdown(&self, cx: &App) -> String {
 115        match self {
 116            Self::UserMessage(message) => message.to_markdown(cx),
 117            Self::AssistantMessage(message) => message.to_markdown(cx),
 118            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 119        }
 120    }
 121
 122    pub fn user_message(&self) -> Option<&UserMessage> {
 123        if let AgentThreadEntry::UserMessage(message) = self {
 124            Some(message)
 125        } else {
 126            None
 127        }
 128    }
 129
 130    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 131        if let AgentThreadEntry::ToolCall(call) = self {
 132            itertools::Either::Left(call.diffs())
 133        } else {
 134            itertools::Either::Right(std::iter::empty())
 135        }
 136    }
 137
 138    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 139        if let AgentThreadEntry::ToolCall(call) = self {
 140            itertools::Either::Left(call.terminals())
 141        } else {
 142            itertools::Either::Right(std::iter::empty())
 143        }
 144    }
 145
 146    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 147        if let AgentThreadEntry::ToolCall(ToolCall {
 148            locations,
 149            resolved_locations,
 150            ..
 151        }) = self
 152        {
 153            Some((
 154                locations.get(ix)?.clone(),
 155                resolved_locations.get(ix)?.clone()?,
 156            ))
 157        } else {
 158            None
 159        }
 160    }
 161}
 162
 163#[derive(Debug)]
 164pub struct ToolCall {
 165    pub id: acp::ToolCallId,
 166    pub label: Entity<Markdown>,
 167    pub kind: acp::ToolKind,
 168    pub content: Vec<ToolCallContent>,
 169    pub status: ToolCallStatus,
 170    pub locations: Vec<acp::ToolCallLocation>,
 171    pub resolved_locations: Vec<Option<AgentLocation>>,
 172    pub raw_input: Option<serde_json::Value>,
 173    pub raw_output: Option<serde_json::Value>,
 174}
 175
 176impl ToolCall {
 177    fn from_acp(
 178        tool_call: acp::ToolCall,
 179        status: ToolCallStatus,
 180        language_registry: Arc<LanguageRegistry>,
 181        cx: &mut App,
 182    ) -> Self {
 183        Self {
 184            id: tool_call.id,
 185            label: cx.new(|cx| {
 186                Markdown::new(
 187                    tool_call.title.into(),
 188                    Some(language_registry.clone()),
 189                    None,
 190                    cx,
 191                )
 192            }),
 193            kind: tool_call.kind,
 194            content: tool_call
 195                .content
 196                .into_iter()
 197                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 198                .collect(),
 199            locations: tool_call.locations,
 200            resolved_locations: Vec::default(),
 201            status,
 202            raw_input: tool_call.raw_input,
 203            raw_output: tool_call.raw_output,
 204        }
 205    }
 206
 207    fn update_fields(
 208        &mut self,
 209        fields: acp::ToolCallUpdateFields,
 210        language_registry: Arc<LanguageRegistry>,
 211        cx: &mut App,
 212    ) {
 213        let acp::ToolCallUpdateFields {
 214            kind,
 215            status,
 216            title,
 217            content,
 218            locations,
 219            raw_input,
 220            raw_output,
 221        } = fields;
 222
 223        if let Some(kind) = kind {
 224            self.kind = kind;
 225        }
 226
 227        if let Some(status) = status {
 228            self.status = status.into();
 229        }
 230
 231        if let Some(title) = title {
 232            self.label.update(cx, |label, cx| {
 233                label.replace(title, cx);
 234            });
 235        }
 236
 237        if let Some(content) = content {
 238            self.content = content
 239                .into_iter()
 240                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 241                .collect();
 242        }
 243
 244        if let Some(locations) = locations {
 245            self.locations = locations;
 246        }
 247
 248        if let Some(raw_input) = raw_input {
 249            self.raw_input = Some(raw_input);
 250        }
 251
 252        if let Some(raw_output) = raw_output {
 253            if self.content.is_empty() {
 254                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 255                {
 256                    self.content
 257                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 258                            markdown,
 259                        }));
 260                }
 261            }
 262            self.raw_output = Some(raw_output);
 263        }
 264    }
 265
 266    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 267        self.content.iter().filter_map(|content| match content {
 268            ToolCallContent::Diff(diff) => Some(diff),
 269            ToolCallContent::ContentBlock(_) => None,
 270            ToolCallContent::Terminal(_) => None,
 271        })
 272    }
 273
 274    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 275        self.content.iter().filter_map(|content| match content {
 276            ToolCallContent::Terminal(terminal) => Some(terminal),
 277            ToolCallContent::ContentBlock(_) => None,
 278            ToolCallContent::Diff(_) => None,
 279        })
 280    }
 281
 282    fn to_markdown(&self, cx: &App) -> String {
 283        let mut markdown = format!(
 284            "**Tool Call: {}**\nStatus: {}\n\n",
 285            self.label.read(cx).source(),
 286            self.status
 287        );
 288        for content in &self.content {
 289            markdown.push_str(content.to_markdown(cx).as_str());
 290            markdown.push_str("\n\n");
 291        }
 292        markdown
 293    }
 294
 295    async fn resolve_location(
 296        location: acp::ToolCallLocation,
 297        project: WeakEntity<Project>,
 298        cx: &mut AsyncApp,
 299    ) -> Option<AgentLocation> {
 300        let buffer = project
 301            .update(cx, |project, cx| {
 302                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 303                    Some(project.open_buffer(path, cx))
 304                } else {
 305                    None
 306                }
 307            })
 308            .ok()??;
 309        let buffer = buffer.await.log_err()?;
 310        let position = buffer
 311            .update(cx, |buffer, _| {
 312                if let Some(row) = location.line {
 313                    let snapshot = buffer.snapshot();
 314                    let column = snapshot.indent_size_for_line(row).len;
 315                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 316                    snapshot.anchor_before(point)
 317                } else {
 318                    Anchor::MIN
 319                }
 320            })
 321            .ok()?;
 322
 323        Some(AgentLocation {
 324            buffer: buffer.downgrade(),
 325            position,
 326        })
 327    }
 328
 329    fn resolve_locations(
 330        &self,
 331        project: Entity<Project>,
 332        cx: &mut App,
 333    ) -> Task<Vec<Option<AgentLocation>>> {
 334        let locations = self.locations.clone();
 335        project.update(cx, |_, cx| {
 336            cx.spawn(async move |project, cx| {
 337                let mut new_locations = Vec::new();
 338                for location in locations {
 339                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 340                }
 341                new_locations
 342            })
 343        })
 344    }
 345}
 346
 347#[derive(Debug)]
 348pub enum ToolCallStatus {
 349    /// The tool call hasn't started running yet, but we start showing it to
 350    /// the user.
 351    Pending,
 352    /// The tool call is waiting for confirmation from the user.
 353    WaitingForConfirmation {
 354        options: Vec<acp::PermissionOption>,
 355        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 356    },
 357    /// The tool call is currently running.
 358    InProgress,
 359    /// The tool call completed successfully.
 360    Completed,
 361    /// The tool call failed.
 362    Failed,
 363    /// The user rejected the tool call.
 364    Rejected,
 365    /// The user canceled generation so the tool call was canceled.
 366    Canceled,
 367}
 368
 369impl From<acp::ToolCallStatus> for ToolCallStatus {
 370    fn from(status: acp::ToolCallStatus) -> Self {
 371        match status {
 372            acp::ToolCallStatus::Pending => Self::Pending,
 373            acp::ToolCallStatus::InProgress => Self::InProgress,
 374            acp::ToolCallStatus::Completed => Self::Completed,
 375            acp::ToolCallStatus::Failed => Self::Failed,
 376        }
 377    }
 378}
 379
 380impl Display for ToolCallStatus {
 381    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 382        write!(
 383            f,
 384            "{}",
 385            match self {
 386                ToolCallStatus::Pending => "Pending",
 387                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 388                ToolCallStatus::InProgress => "In Progress",
 389                ToolCallStatus::Completed => "Completed",
 390                ToolCallStatus::Failed => "Failed",
 391                ToolCallStatus::Rejected => "Rejected",
 392                ToolCallStatus::Canceled => "Canceled",
 393            }
 394        )
 395    }
 396}
 397
 398#[derive(Debug, PartialEq, Clone)]
 399pub enum ContentBlock {
 400    Empty,
 401    Markdown { markdown: Entity<Markdown> },
 402    ResourceLink { resource_link: acp::ResourceLink },
 403}
 404
 405impl ContentBlock {
 406    pub fn new(
 407        block: acp::ContentBlock,
 408        language_registry: &Arc<LanguageRegistry>,
 409        cx: &mut App,
 410    ) -> Self {
 411        let mut this = Self::Empty;
 412        this.append(block, language_registry, cx);
 413        this
 414    }
 415
 416    pub fn new_combined(
 417        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 418        language_registry: Arc<LanguageRegistry>,
 419        cx: &mut App,
 420    ) -> Self {
 421        let mut this = Self::Empty;
 422        for block in blocks {
 423            this.append(block, &language_registry, cx);
 424        }
 425        this
 426    }
 427
 428    pub fn append(
 429        &mut self,
 430        block: acp::ContentBlock,
 431        language_registry: &Arc<LanguageRegistry>,
 432        cx: &mut App,
 433    ) {
 434        if matches!(self, ContentBlock::Empty) {
 435            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 436                *self = ContentBlock::ResourceLink { resource_link };
 437                return;
 438            }
 439        }
 440
 441        let new_content = self.block_string_contents(block);
 442
 443        match self {
 444            ContentBlock::Empty => {
 445                *self = Self::create_markdown_block(new_content, language_registry, cx);
 446            }
 447            ContentBlock::Markdown { markdown } => {
 448                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 449            }
 450            ContentBlock::ResourceLink { resource_link } => {
 451                let existing_content = Self::resource_link_md(&resource_link.uri);
 452                let combined = format!("{}\n{}", existing_content, new_content);
 453
 454                *self = Self::create_markdown_block(combined, language_registry, cx);
 455            }
 456        }
 457    }
 458
 459    fn create_markdown_block(
 460        content: String,
 461        language_registry: &Arc<LanguageRegistry>,
 462        cx: &mut App,
 463    ) -> ContentBlock {
 464        ContentBlock::Markdown {
 465            markdown: cx
 466                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 467        }
 468    }
 469
 470    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 471        match block {
 472            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 473            acp::ContentBlock::ResourceLink(resource_link) => {
 474                Self::resource_link_md(&resource_link.uri)
 475            }
 476            acp::ContentBlock::Resource(acp::EmbeddedResource {
 477                resource:
 478                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 479                        uri,
 480                        ..
 481                    }),
 482                ..
 483            }) => Self::resource_link_md(&uri),
 484            acp::ContentBlock::Image(image) => Self::image_md(&image),
 485            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 486        }
 487    }
 488
 489    fn resource_link_md(uri: &str) -> String {
 490        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 491            uri.as_link().to_string()
 492        } else {
 493            uri.to_string()
 494        }
 495    }
 496
 497    fn image_md(_image: &acp::ImageContent) -> String {
 498        "`Image`".into()
 499    }
 500
 501    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 502        match self {
 503            ContentBlock::Empty => "",
 504            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 505            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 506        }
 507    }
 508
 509    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 510        match self {
 511            ContentBlock::Empty => None,
 512            ContentBlock::Markdown { markdown } => Some(markdown),
 513            ContentBlock::ResourceLink { .. } => None,
 514        }
 515    }
 516
 517    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 518        match self {
 519            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 520            _ => None,
 521        }
 522    }
 523}
 524
 525#[derive(Debug)]
 526pub enum ToolCallContent {
 527    ContentBlock(ContentBlock),
 528    Diff(Entity<Diff>),
 529    Terminal(Entity<Terminal>),
 530}
 531
 532impl ToolCallContent {
 533    pub fn from_acp(
 534        content: acp::ToolCallContent,
 535        language_registry: Arc<LanguageRegistry>,
 536        cx: &mut App,
 537    ) -> Self {
 538        match content {
 539            acp::ToolCallContent::Content { content } => {
 540                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 541            }
 542            acp::ToolCallContent::Diff { diff } => {
 543                Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
 544            }
 545        }
 546    }
 547
 548    pub fn to_markdown(&self, cx: &App) -> String {
 549        match self {
 550            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 551            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 552            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 553        }
 554    }
 555}
 556
 557#[derive(Debug, PartialEq)]
 558pub enum ToolCallUpdate {
 559    UpdateFields(acp::ToolCallUpdate),
 560    UpdateDiff(ToolCallUpdateDiff),
 561    UpdateTerminal(ToolCallUpdateTerminal),
 562}
 563
 564impl ToolCallUpdate {
 565    fn id(&self) -> &acp::ToolCallId {
 566        match self {
 567            Self::UpdateFields(update) => &update.id,
 568            Self::UpdateDiff(diff) => &diff.id,
 569            Self::UpdateTerminal(terminal) => &terminal.id,
 570        }
 571    }
 572}
 573
 574impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 575    fn from(update: acp::ToolCallUpdate) -> Self {
 576        Self::UpdateFields(update)
 577    }
 578}
 579
 580impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 581    fn from(diff: ToolCallUpdateDiff) -> Self {
 582        Self::UpdateDiff(diff)
 583    }
 584}
 585
 586#[derive(Debug, PartialEq)]
 587pub struct ToolCallUpdateDiff {
 588    pub id: acp::ToolCallId,
 589    pub diff: Entity<Diff>,
 590}
 591
 592impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 593    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 594        Self::UpdateTerminal(terminal)
 595    }
 596}
 597
 598#[derive(Debug, PartialEq)]
 599pub struct ToolCallUpdateTerminal {
 600    pub id: acp::ToolCallId,
 601    pub terminal: Entity<Terminal>,
 602}
 603
 604#[derive(Debug, Default)]
 605pub struct Plan {
 606    pub entries: Vec<PlanEntry>,
 607}
 608
 609#[derive(Debug)]
 610pub struct PlanStats<'a> {
 611    pub in_progress_entry: Option<&'a PlanEntry>,
 612    pub pending: u32,
 613    pub completed: u32,
 614}
 615
 616impl Plan {
 617    pub fn is_empty(&self) -> bool {
 618        self.entries.is_empty()
 619    }
 620
 621    pub fn stats(&self) -> PlanStats<'_> {
 622        let mut stats = PlanStats {
 623            in_progress_entry: None,
 624            pending: 0,
 625            completed: 0,
 626        };
 627
 628        for entry in &self.entries {
 629            match &entry.status {
 630                acp::PlanEntryStatus::Pending => {
 631                    stats.pending += 1;
 632                }
 633                acp::PlanEntryStatus::InProgress => {
 634                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 635                }
 636                acp::PlanEntryStatus::Completed => {
 637                    stats.completed += 1;
 638                }
 639            }
 640        }
 641
 642        stats
 643    }
 644}
 645
 646#[derive(Debug)]
 647pub struct PlanEntry {
 648    pub content: Entity<Markdown>,
 649    pub priority: acp::PlanEntryPriority,
 650    pub status: acp::PlanEntryStatus,
 651}
 652
 653impl PlanEntry {
 654    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 655        Self {
 656            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 657            priority: entry.priority,
 658            status: entry.status,
 659        }
 660    }
 661}
 662
 663#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
 664pub struct AgentServerName(pub SharedString);
 665
 666#[derive(Debug, Clone, Serialize, Deserialize)]
 667pub struct AcpThreadMetadata {
 668    pub agent: AgentServerName,
 669    pub id: acp::SessionId,
 670    pub title: SharedString,
 671    pub updated_at: DateTime<Utc>,
 672}
 673
 674pub struct AcpThread {
 675    title: SharedString,
 676    entries: Vec<AgentThreadEntry>,
 677    plan: Plan,
 678    project: Entity<Project>,
 679    action_log: Entity<ActionLog>,
 680    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 681    send_task: Option<Task<()>>,
 682    connection: Rc<dyn AgentConnection>,
 683    session_id: acp::SessionId,
 684}
 685
 686pub enum AcpThreadEvent {
 687    NewEntry,
 688    EntryUpdated(usize),
 689    EntriesRemoved(Range<usize>),
 690    ToolAuthorizationRequired,
 691    Stopped,
 692    Error,
 693    ServerExited(ExitStatus),
 694}
 695
 696impl EventEmitter<AcpThreadEvent> for AcpThread {}
 697
 698#[derive(PartialEq, Eq)]
 699pub enum ThreadStatus {
 700    Idle,
 701    WaitingForToolConfirmation,
 702    Generating,
 703}
 704
 705#[derive(Debug, Clone)]
 706pub enum LoadError {
 707    Unsupported {
 708        error_message: SharedString,
 709        upgrade_message: SharedString,
 710        upgrade_command: String,
 711    },
 712    Exited(i32),
 713    Other(SharedString),
 714}
 715
 716impl Display for LoadError {
 717    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 718        match self {
 719            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 720            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 721            LoadError::Other(msg) => write!(f, "{}", msg),
 722        }
 723    }
 724}
 725
 726impl Error for LoadError {}
 727
 728impl AcpThread {
 729    pub fn new(
 730        title: impl Into<SharedString>,
 731        connection: Rc<dyn AgentConnection>,
 732        project: Entity<Project>,
 733        session_id: acp::SessionId,
 734        cx: &mut Context<Self>,
 735    ) -> Self {
 736        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 737
 738        Self {
 739            action_log,
 740            shared_buffers: Default::default(),
 741            entries: Default::default(),
 742            plan: Default::default(),
 743            title: title.into(),
 744            project,
 745            send_task: None,
 746            connection,
 747            session_id,
 748        }
 749    }
 750
 751    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 752        &self.connection
 753    }
 754
 755    pub fn action_log(&self) -> &Entity<ActionLog> {
 756        &self.action_log
 757    }
 758
 759    pub fn project(&self) -> &Entity<Project> {
 760        &self.project
 761    }
 762
 763    pub fn title(&self) -> SharedString {
 764        self.title.clone()
 765    }
 766
 767    pub fn entries(&self) -> &[AgentThreadEntry] {
 768        &self.entries
 769    }
 770
 771    pub fn session_id(&self) -> &acp::SessionId {
 772        &self.session_id
 773    }
 774
 775    pub fn status(&self) -> ThreadStatus {
 776        if self.send_task.is_some() {
 777            if self.waiting_for_tool_confirmation() {
 778                ThreadStatus::WaitingForToolConfirmation
 779            } else {
 780                ThreadStatus::Generating
 781            }
 782        } else {
 783            ThreadStatus::Idle
 784        }
 785    }
 786
 787    pub fn has_pending_edit_tool_calls(&self) -> bool {
 788        for entry in self.entries.iter().rev() {
 789            match entry {
 790                AgentThreadEntry::UserMessage(_) => return false,
 791                AgentThreadEntry::ToolCall(
 792                    call @ ToolCall {
 793                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
 794                        ..
 795                    },
 796                ) if call.diffs().next().is_some() => {
 797                    return true;
 798                }
 799                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 800            }
 801        }
 802
 803        false
 804    }
 805
 806    pub fn used_tools_since_last_user_message(&self) -> bool {
 807        for entry in self.entries.iter().rev() {
 808            match entry {
 809                AgentThreadEntry::UserMessage(..) => return false,
 810                AgentThreadEntry::AssistantMessage(..) => continue,
 811                AgentThreadEntry::ToolCall(..) => return true,
 812            }
 813        }
 814
 815        false
 816    }
 817
 818    pub fn handle_session_update(
 819        &mut self,
 820        update: acp::SessionUpdate,
 821        cx: &mut Context<Self>,
 822    ) -> Result<(), acp::Error> {
 823        match update {
 824            acp::SessionUpdate::UserMessageChunk { content } => {
 825                self.push_user_content_block(None, content, cx);
 826            }
 827            acp::SessionUpdate::AgentMessageChunk { content } => {
 828                self.push_assistant_content_block(content, false, cx);
 829            }
 830            acp::SessionUpdate::AgentThoughtChunk { content } => {
 831                self.push_assistant_content_block(content, true, cx);
 832            }
 833            acp::SessionUpdate::ToolCall(tool_call) => {
 834                self.upsert_tool_call(tool_call, cx)?;
 835            }
 836            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 837                self.update_tool_call(tool_call_update, cx)?;
 838            }
 839            acp::SessionUpdate::Plan(plan) => {
 840                self.update_plan(plan, cx);
 841            }
 842        }
 843        Ok(())
 844    }
 845
 846    pub fn push_user_content_block(
 847        &mut self,
 848        message_id: Option<UserMessageId>,
 849        chunk: acp::ContentBlock,
 850        cx: &mut Context<Self>,
 851    ) {
 852        let language_registry = self.project.read(cx).languages().clone();
 853        let entries_len = self.entries.len();
 854
 855        if let Some(last_entry) = self.entries.last_mut()
 856            && let AgentThreadEntry::UserMessage(UserMessage {
 857                id,
 858                content,
 859                chunks,
 860                ..
 861            }) = last_entry
 862        {
 863            *id = message_id.or(id.take());
 864            content.append(chunk.clone(), &language_registry, cx);
 865            chunks.push(chunk);
 866            let idx = entries_len - 1;
 867            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 868        } else {
 869            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 870            self.push_entry(
 871                AgentThreadEntry::UserMessage(UserMessage {
 872                    id: message_id,
 873                    content,
 874                    chunks: vec![chunk],
 875                    checkpoint: None,
 876                }),
 877                cx,
 878            );
 879        }
 880    }
 881
 882    pub fn push_assistant_content_block(
 883        &mut self,
 884        chunk: acp::ContentBlock,
 885        is_thought: bool,
 886        cx: &mut Context<Self>,
 887    ) {
 888        let language_registry = self.project.read(cx).languages().clone();
 889        let entries_len = self.entries.len();
 890        if let Some(last_entry) = self.entries.last_mut()
 891            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 892        {
 893            let idx = entries_len - 1;
 894            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 895            match (chunks.last_mut(), is_thought) {
 896                (Some(AssistantMessageChunk::Message { block }), false)
 897                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 898                    block.append(chunk, &language_registry, cx)
 899                }
 900                _ => {
 901                    let block = ContentBlock::new(chunk, &language_registry, cx);
 902                    if is_thought {
 903                        chunks.push(AssistantMessageChunk::Thought { block })
 904                    } else {
 905                        chunks.push(AssistantMessageChunk::Message { block })
 906                    }
 907                }
 908            }
 909        } else {
 910            let block = ContentBlock::new(chunk, &language_registry, cx);
 911            let chunk = if is_thought {
 912                AssistantMessageChunk::Thought { block }
 913            } else {
 914                AssistantMessageChunk::Message { block }
 915            };
 916
 917            self.push_entry(
 918                AgentThreadEntry::AssistantMessage(AssistantMessage {
 919                    chunks: vec![chunk],
 920                }),
 921                cx,
 922            );
 923        }
 924    }
 925
 926    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 927        self.entries.push(entry);
 928        cx.emit(AcpThreadEvent::NewEntry);
 929    }
 930
 931    pub fn update_tool_call(
 932        &mut self,
 933        update: impl Into<ToolCallUpdate>,
 934        cx: &mut Context<Self>,
 935    ) -> Result<()> {
 936        let update = update.into();
 937        let languages = self.project.read(cx).languages().clone();
 938
 939        let (ix, current_call) = self
 940            .tool_call_mut(update.id())
 941            .context("Tool call not found")?;
 942        match update {
 943            ToolCallUpdate::UpdateFields(update) => {
 944                let location_updated = update.fields.locations.is_some();
 945                current_call.update_fields(update.fields, languages, cx);
 946                if location_updated {
 947                    self.resolve_locations(update.id.clone(), cx);
 948                }
 949            }
 950            ToolCallUpdate::UpdateDiff(update) => {
 951                current_call.content.clear();
 952                current_call
 953                    .content
 954                    .push(ToolCallContent::Diff(update.diff));
 955            }
 956            ToolCallUpdate::UpdateTerminal(update) => {
 957                current_call.content.clear();
 958                current_call
 959                    .content
 960                    .push(ToolCallContent::Terminal(update.terminal));
 961            }
 962        }
 963
 964        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 965
 966        Ok(())
 967    }
 968
 969    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 970    pub fn upsert_tool_call(
 971        &mut self,
 972        tool_call: acp::ToolCall,
 973        cx: &mut Context<Self>,
 974    ) -> Result<(), acp::Error> {
 975        let status = tool_call.status.into();
 976        self.upsert_tool_call_inner(tool_call.into(), status, cx)
 977    }
 978
 979    /// Fails if id does not match an existing entry.
 980    pub fn upsert_tool_call_inner(
 981        &mut self,
 982        tool_call_update: acp::ToolCallUpdate,
 983        status: ToolCallStatus,
 984        cx: &mut Context<Self>,
 985    ) -> Result<(), acp::Error> {
 986        let language_registry = self.project.read(cx).languages().clone();
 987        let id = tool_call_update.id.clone();
 988
 989        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
 990            current_call.update_fields(tool_call_update.fields, language_registry, cx);
 991            current_call.status = status;
 992
 993            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 994        } else {
 995            let call =
 996                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
 997            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 998        };
 999
1000        self.resolve_locations(id, cx);
1001        Ok(())
1002    }
1003
1004    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1005        // The tool call we are looking for is typically the last one, or very close to the end.
1006        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1007        self.entries
1008            .iter_mut()
1009            .enumerate()
1010            .rev()
1011            .find_map(|(index, tool_call)| {
1012                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1013                    && &tool_call.id == id
1014                {
1015                    Some((index, tool_call))
1016                } else {
1017                    None
1018                }
1019            })
1020    }
1021
1022    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1023        let project = self.project.clone();
1024        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1025            return;
1026        };
1027        let task = tool_call.resolve_locations(project, cx);
1028        cx.spawn(async move |this, cx| {
1029            let resolved_locations = task.await;
1030            this.update(cx, |this, cx| {
1031                let project = this.project.clone();
1032                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1033                    return;
1034                };
1035                if let Some(Some(location)) = resolved_locations.last() {
1036                    project.update(cx, |project, cx| {
1037                        if let Some(agent_location) = project.agent_location() {
1038                            let should_ignore = agent_location.buffer == location.buffer
1039                                && location
1040                                    .buffer
1041                                    .update(cx, |buffer, _| {
1042                                        let snapshot = buffer.snapshot();
1043                                        let old_position =
1044                                            agent_location.position.to_point(&snapshot);
1045                                        let new_position = location.position.to_point(&snapshot);
1046                                        // ignore this so that when we get updates from the edit tool
1047                                        // the position doesn't reset to the startof line
1048                                        old_position.row == new_position.row
1049                                            && old_position.column > new_position.column
1050                                    })
1051                                    .ok()
1052                                    .unwrap_or_default();
1053                            if !should_ignore {
1054                                project.set_agent_location(Some(location.clone()), cx);
1055                            }
1056                        }
1057                    });
1058                }
1059                if tool_call.resolved_locations != resolved_locations {
1060                    tool_call.resolved_locations = resolved_locations;
1061                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1062                }
1063            })
1064        })
1065        .detach();
1066    }
1067
1068    pub fn request_tool_call_authorization(
1069        &mut self,
1070        tool_call: acp::ToolCallUpdate,
1071        options: Vec<acp::PermissionOption>,
1072        cx: &mut Context<Self>,
1073    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1074        let (tx, rx) = oneshot::channel();
1075
1076        let status = ToolCallStatus::WaitingForConfirmation {
1077            options,
1078            respond_tx: tx,
1079        };
1080
1081        self.upsert_tool_call_inner(tool_call, status, cx)?;
1082        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1083        Ok(rx)
1084    }
1085
1086    pub fn authorize_tool_call(
1087        &mut self,
1088        id: acp::ToolCallId,
1089        option_id: acp::PermissionOptionId,
1090        option_kind: acp::PermissionOptionKind,
1091        cx: &mut Context<Self>,
1092    ) {
1093        let Some((ix, call)) = self.tool_call_mut(&id) else {
1094            return;
1095        };
1096
1097        let new_status = match option_kind {
1098            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1099                ToolCallStatus::Rejected
1100            }
1101            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1102                ToolCallStatus::InProgress
1103            }
1104        };
1105
1106        let curr_status = mem::replace(&mut call.status, new_status);
1107
1108        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1109            respond_tx.send(option_id).log_err();
1110        } else if cfg!(debug_assertions) {
1111            panic!("tried to authorize an already authorized tool call");
1112        }
1113
1114        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1115    }
1116
1117    /// Returns true if the last turn is awaiting tool authorization
1118    pub fn waiting_for_tool_confirmation(&self) -> bool {
1119        for entry in self.entries.iter().rev() {
1120            match &entry {
1121                AgentThreadEntry::ToolCall(call) => match call.status {
1122                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1123                    ToolCallStatus::Pending
1124                    | ToolCallStatus::InProgress
1125                    | ToolCallStatus::Completed
1126                    | ToolCallStatus::Failed
1127                    | ToolCallStatus::Rejected
1128                    | ToolCallStatus::Canceled => continue,
1129                },
1130                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1131                    // Reached the beginning of the turn
1132                    return false;
1133                }
1134            }
1135        }
1136        false
1137    }
1138
1139    pub fn plan(&self) -> &Plan {
1140        &self.plan
1141    }
1142
1143    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1144        let new_entries_len = request.entries.len();
1145        let mut new_entries = request.entries.into_iter();
1146
1147        // Reuse existing markdown to prevent flickering
1148        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1149            let PlanEntry {
1150                content,
1151                priority,
1152                status,
1153            } = old;
1154            content.update(cx, |old, cx| {
1155                old.replace(new.content, cx);
1156            });
1157            *priority = new.priority;
1158            *status = new.status;
1159        }
1160        for new in new_entries {
1161            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1162        }
1163        self.plan.entries.truncate(new_entries_len);
1164
1165        cx.notify();
1166    }
1167
1168    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1169        self.plan
1170            .entries
1171            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1172        cx.notify();
1173    }
1174
1175    #[cfg(any(test, feature = "test-support"))]
1176    pub fn send_raw(
1177        &mut self,
1178        message: &str,
1179        cx: &mut Context<Self>,
1180    ) -> BoxFuture<'static, Result<()>> {
1181        self.send(
1182            vec![acp::ContentBlock::Text(acp::TextContent {
1183                text: message.to_string(),
1184                annotations: None,
1185            })],
1186            cx,
1187        )
1188    }
1189
1190    pub fn send(
1191        &mut self,
1192        message: Vec<acp::ContentBlock>,
1193        cx: &mut Context<Self>,
1194    ) -> BoxFuture<'static, Result<()>> {
1195        let block = ContentBlock::new_combined(
1196            message.clone(),
1197            self.project.read(cx).languages().clone(),
1198            cx,
1199        );
1200        let request = acp::PromptRequest {
1201            prompt: message.clone(),
1202            session_id: self.session_id.clone(),
1203        };
1204        let git_store = self.project.read(cx).git_store().clone();
1205
1206        let message_id = if self
1207            .connection
1208            .session_editor(&self.session_id, cx)
1209            .is_some()
1210        {
1211            Some(UserMessageId::new())
1212        } else {
1213            None
1214        };
1215        self.push_entry(
1216            AgentThreadEntry::UserMessage(UserMessage {
1217                id: message_id.clone(),
1218                content: block,
1219                chunks: message,
1220                checkpoint: None,
1221            }),
1222            cx,
1223        );
1224
1225        self.run_turn(cx, async move |this, cx| {
1226            let old_checkpoint = git_store
1227                .update(cx, |git, cx| git.checkpoint(cx))?
1228                .await
1229                .context("failed to get old checkpoint")
1230                .log_err();
1231            this.update(cx, |this, cx| {
1232                if let Some((_ix, message)) = this.last_user_message() {
1233                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1234                        git_checkpoint,
1235                        show: false,
1236                    });
1237                }
1238                this.connection.prompt(message_id, request, cx)
1239            })?
1240            .await
1241        })
1242    }
1243
1244    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1245        self.run_turn(cx, async move |this, cx| {
1246            this.update(cx, |this, cx| {
1247                this.connection
1248                    .resume(&this.session_id, cx)
1249                    .map(|resume| resume.run(cx))
1250            })?
1251            .context("resuming a session is not supported")?
1252            .await
1253        })
1254    }
1255
1256    fn run_turn(
1257        &mut self,
1258        cx: &mut Context<Self>,
1259        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1260    ) -> BoxFuture<'static, Result<()>> {
1261        self.clear_completed_plan_entries(cx);
1262
1263        let (tx, rx) = oneshot::channel();
1264        let cancel_task = self.cancel(cx);
1265
1266        self.send_task = Some(cx.spawn(async move |this, cx| {
1267            cancel_task.await;
1268            tx.send(f(this, cx).await).ok();
1269        }));
1270
1271        cx.spawn(async move |this, cx| {
1272            let response = rx.await;
1273
1274            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1275                .await?;
1276
1277            this.update(cx, |this, cx| {
1278                match response {
1279                    Ok(Err(e)) => {
1280                        this.send_task.take();
1281                        cx.emit(AcpThreadEvent::Error);
1282                        Err(e)
1283                    }
1284                    result => {
1285                        let canceled = matches!(
1286                            result,
1287                            Ok(Ok(acp::PromptResponse {
1288                                stop_reason: acp::StopReason::Canceled
1289                            }))
1290                        );
1291
1292                        // We only take the task if the current prompt wasn't canceled.
1293                        //
1294                        // This prompt may have been canceled because another one was sent
1295                        // while it was still generating. In these cases, dropping `send_task`
1296                        // would cause the next generation to be canceled.
1297                        if !canceled {
1298                            this.send_task.take();
1299                        }
1300
1301                        cx.emit(AcpThreadEvent::Stopped);
1302                        Ok(())
1303                    }
1304                }
1305            })?
1306        })
1307        .boxed()
1308    }
1309
1310    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1311        let Some(send_task) = self.send_task.take() else {
1312            return Task::ready(());
1313        };
1314
1315        for entry in self.entries.iter_mut() {
1316            if let AgentThreadEntry::ToolCall(call) = entry {
1317                let cancel = matches!(
1318                    call.status,
1319                    ToolCallStatus::Pending
1320                        | ToolCallStatus::WaitingForConfirmation { .. }
1321                        | ToolCallStatus::InProgress
1322                );
1323
1324                if cancel {
1325                    call.status = ToolCallStatus::Canceled;
1326                }
1327            }
1328        }
1329
1330        self.connection.cancel(&self.session_id, cx);
1331
1332        // Wait for the send task to complete
1333        cx.foreground_executor().spawn(send_task)
1334    }
1335
1336    /// Rewinds this thread to before the entry at `index`, removing it and all
1337    /// subsequent entries while reverting any changes made from that point.
1338    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1339        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1340            return Task::ready(Err(anyhow!("not supported")));
1341        };
1342        let Some(message) = self.user_message(&id) else {
1343            return Task::ready(Err(anyhow!("message not found")));
1344        };
1345
1346        let checkpoint = message
1347            .checkpoint
1348            .as_ref()
1349            .map(|c| c.git_checkpoint.clone());
1350
1351        let git_store = self.project.read(cx).git_store().clone();
1352        cx.spawn(async move |this, cx| {
1353            if let Some(checkpoint) = checkpoint {
1354                git_store
1355                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1356                    .await?;
1357            }
1358
1359            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1360                .await?;
1361            this.update(cx, |this, cx| {
1362                if let Some((ix, _)) = this.user_message_mut(&id) {
1363                    let range = ix..this.entries.len();
1364                    this.entries.truncate(ix);
1365                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1366                }
1367            })
1368        })
1369    }
1370
1371    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1372        let git_store = self.project.read(cx).git_store().clone();
1373
1374        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1375            if let Some(checkpoint) = message.checkpoint.as_ref() {
1376                checkpoint.git_checkpoint.clone()
1377            } else {
1378                return Task::ready(Ok(()));
1379            }
1380        } else {
1381            return Task::ready(Ok(()));
1382        };
1383
1384        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1385        cx.spawn(async move |this, cx| {
1386            let new_checkpoint = new_checkpoint
1387                .await
1388                .context("failed to get new checkpoint")
1389                .log_err();
1390            if let Some(new_checkpoint) = new_checkpoint {
1391                let equal = git_store
1392                    .update(cx, |git, cx| {
1393                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1394                    })?
1395                    .await
1396                    .unwrap_or(true);
1397                this.update(cx, |this, cx| {
1398                    let (ix, message) = this.last_user_message().context("no user message")?;
1399                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1400                    checkpoint.show = !equal;
1401                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1402                    anyhow::Ok(())
1403                })??;
1404            }
1405
1406            Ok(())
1407        })
1408    }
1409
1410    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1411        self.entries
1412            .iter_mut()
1413            .enumerate()
1414            .rev()
1415            .find_map(|(ix, entry)| {
1416                if let AgentThreadEntry::UserMessage(message) = entry {
1417                    Some((ix, message))
1418                } else {
1419                    None
1420                }
1421            })
1422    }
1423
1424    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1425        self.entries.iter().find_map(|entry| {
1426            if let AgentThreadEntry::UserMessage(message) = entry {
1427                if message.id.as_ref() == Some(&id) {
1428                    Some(message)
1429                } else {
1430                    None
1431                }
1432            } else {
1433                None
1434            }
1435        })
1436    }
1437
1438    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1439        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1440            if let AgentThreadEntry::UserMessage(message) = entry {
1441                if message.id.as_ref() == Some(&id) {
1442                    Some((ix, message))
1443                } else {
1444                    None
1445                }
1446            } else {
1447                None
1448            }
1449        })
1450    }
1451
1452    pub fn read_text_file(
1453        &self,
1454        path: PathBuf,
1455        line: Option<u32>,
1456        limit: Option<u32>,
1457        reuse_shared_snapshot: bool,
1458        cx: &mut Context<Self>,
1459    ) -> Task<Result<String>> {
1460        let project = self.project.clone();
1461        let action_log = self.action_log.clone();
1462        cx.spawn(async move |this, cx| {
1463            let load = project.update(cx, |project, cx| {
1464                let path = project
1465                    .project_path_for_absolute_path(&path, cx)
1466                    .context("invalid path")?;
1467                anyhow::Ok(project.open_buffer(path, cx))
1468            });
1469            let buffer = load??.await?;
1470
1471            let snapshot = if reuse_shared_snapshot {
1472                this.read_with(cx, |this, _| {
1473                    this.shared_buffers.get(&buffer.clone()).cloned()
1474                })
1475                .log_err()
1476                .flatten()
1477            } else {
1478                None
1479            };
1480
1481            let snapshot = if let Some(snapshot) = snapshot {
1482                snapshot
1483            } else {
1484                action_log.update(cx, |action_log, cx| {
1485                    action_log.buffer_read(buffer.clone(), cx);
1486                })?;
1487                project.update(cx, |project, cx| {
1488                    let position = buffer
1489                        .read(cx)
1490                        .snapshot()
1491                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1492                    project.set_agent_location(
1493                        Some(AgentLocation {
1494                            buffer: buffer.downgrade(),
1495                            position,
1496                        }),
1497                        cx,
1498                    );
1499                })?;
1500
1501                buffer.update(cx, |buffer, _| buffer.snapshot())?
1502            };
1503
1504            this.update(cx, |this, _| {
1505                let text = snapshot.text();
1506                this.shared_buffers.insert(buffer.clone(), snapshot);
1507                if line.is_none() && limit.is_none() {
1508                    return Ok(text);
1509                }
1510                let limit = limit.unwrap_or(u32::MAX) as usize;
1511                let Some(line) = line else {
1512                    return Ok(text.lines().take(limit).collect::<String>());
1513                };
1514
1515                let count = text.lines().count();
1516                if count < line as usize {
1517                    anyhow::bail!("There are only {} lines", count);
1518                }
1519                Ok(text
1520                    .lines()
1521                    .skip(line as usize + 1)
1522                    .take(limit)
1523                    .collect::<String>())
1524            })?
1525        })
1526    }
1527
1528    pub fn write_text_file(
1529        &self,
1530        path: PathBuf,
1531        content: String,
1532        cx: &mut Context<Self>,
1533    ) -> Task<Result<()>> {
1534        let project = self.project.clone();
1535        let action_log = self.action_log.clone();
1536        cx.spawn(async move |this, cx| {
1537            let load = project.update(cx, |project, cx| {
1538                let path = project
1539                    .project_path_for_absolute_path(&path, cx)
1540                    .context("invalid path")?;
1541                anyhow::Ok(project.open_buffer(path, cx))
1542            });
1543            let buffer = load??.await?;
1544            let snapshot = this.update(cx, |this, cx| {
1545                this.shared_buffers
1546                    .get(&buffer)
1547                    .cloned()
1548                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1549            })?;
1550            let edits = cx
1551                .background_executor()
1552                .spawn(async move {
1553                    let old_text = snapshot.text();
1554                    text_diff(old_text.as_str(), &content)
1555                        .into_iter()
1556                        .map(|(range, replacement)| {
1557                            (
1558                                snapshot.anchor_after(range.start)
1559                                    ..snapshot.anchor_before(range.end),
1560                                replacement,
1561                            )
1562                        })
1563                        .collect::<Vec<_>>()
1564                })
1565                .await;
1566            cx.update(|cx| {
1567                project.update(cx, |project, cx| {
1568                    project.set_agent_location(
1569                        Some(AgentLocation {
1570                            buffer: buffer.downgrade(),
1571                            position: edits
1572                                .last()
1573                                .map(|(range, _)| range.end)
1574                                .unwrap_or(Anchor::MIN),
1575                        }),
1576                        cx,
1577                    );
1578                });
1579
1580                action_log.update(cx, |action_log, cx| {
1581                    action_log.buffer_read(buffer.clone(), cx);
1582                });
1583                buffer.update(cx, |buffer, cx| {
1584                    buffer.edit(edits, None, cx);
1585                });
1586                action_log.update(cx, |action_log, cx| {
1587                    action_log.buffer_edited(buffer.clone(), cx);
1588                });
1589            })?;
1590            project
1591                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1592                .await
1593        })
1594    }
1595
1596    pub fn to_markdown(&self, cx: &App) -> String {
1597        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1598    }
1599
1600    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1601        cx.emit(AcpThreadEvent::ServerExited(status));
1602    }
1603}
1604
1605fn markdown_for_raw_output(
1606    raw_output: &serde_json::Value,
1607    language_registry: &Arc<LanguageRegistry>,
1608    cx: &mut App,
1609) -> Option<Entity<Markdown>> {
1610    match raw_output {
1611        serde_json::Value::Null => None,
1612        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1613            Markdown::new(
1614                value.to_string().into(),
1615                Some(language_registry.clone()),
1616                None,
1617                cx,
1618            )
1619        })),
1620        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1621            Markdown::new(
1622                value.to_string().into(),
1623                Some(language_registry.clone()),
1624                None,
1625                cx,
1626            )
1627        })),
1628        serde_json::Value::String(value) => Some(cx.new(|cx| {
1629            Markdown::new(
1630                value.clone().into(),
1631                Some(language_registry.clone()),
1632                None,
1633                cx,
1634            )
1635        })),
1636        value => Some(cx.new(|cx| {
1637            Markdown::new(
1638                format!("```json\n{}\n```", value).into(),
1639                Some(language_registry.clone()),
1640                None,
1641                cx,
1642            )
1643        })),
1644    }
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649    use super::*;
1650    use anyhow::anyhow;
1651    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1652    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1653    use indoc::indoc;
1654    use project::{FakeFs, Fs};
1655    use rand::Rng as _;
1656    use serde_json::json;
1657    use settings::SettingsStore;
1658    use smol::stream::StreamExt as _;
1659    use std::{
1660        any::Any,
1661        cell::RefCell,
1662        path::Path,
1663        rc::Rc,
1664        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1665        time::Duration,
1666    };
1667    use util::path;
1668
1669    fn init_test(cx: &mut TestAppContext) {
1670        env_logger::try_init().ok();
1671        cx.update(|cx| {
1672            let settings_store = SettingsStore::test(cx);
1673            cx.set_global(settings_store);
1674            Project::init_settings(cx);
1675            language::init(cx);
1676        });
1677    }
1678
1679    #[gpui::test]
1680    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1681        init_test(cx);
1682
1683        let fs = FakeFs::new(cx.executor());
1684        let project = Project::test(fs, [], cx).await;
1685        let connection = Rc::new(FakeAgentConnection::new());
1686        let thread = cx
1687            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1688            .await
1689            .unwrap();
1690
1691        // Test creating a new user message
1692        thread.update(cx, |thread, cx| {
1693            thread.push_user_content_block(
1694                None,
1695                acp::ContentBlock::Text(acp::TextContent {
1696                    annotations: None,
1697                    text: "Hello, ".to_string(),
1698                }),
1699                cx,
1700            );
1701        });
1702
1703        thread.update(cx, |thread, cx| {
1704            assert_eq!(thread.entries.len(), 1);
1705            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1706                assert_eq!(user_msg.id, None);
1707                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1708            } else {
1709                panic!("Expected UserMessage");
1710            }
1711        });
1712
1713        // Test appending to existing user message
1714        let message_1_id = UserMessageId::new();
1715        thread.update(cx, |thread, cx| {
1716            thread.push_user_content_block(
1717                Some(message_1_id.clone()),
1718                acp::ContentBlock::Text(acp::TextContent {
1719                    annotations: None,
1720                    text: "world!".to_string(),
1721                }),
1722                cx,
1723            );
1724        });
1725
1726        thread.update(cx, |thread, cx| {
1727            assert_eq!(thread.entries.len(), 1);
1728            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1729                assert_eq!(user_msg.id, Some(message_1_id));
1730                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1731            } else {
1732                panic!("Expected UserMessage");
1733            }
1734        });
1735
1736        // Test creating new user message after assistant message
1737        thread.update(cx, |thread, cx| {
1738            thread.push_assistant_content_block(
1739                acp::ContentBlock::Text(acp::TextContent {
1740                    annotations: None,
1741                    text: "Assistant response".to_string(),
1742                }),
1743                false,
1744                cx,
1745            );
1746        });
1747
1748        let message_2_id = UserMessageId::new();
1749        thread.update(cx, |thread, cx| {
1750            thread.push_user_content_block(
1751                Some(message_2_id.clone()),
1752                acp::ContentBlock::Text(acp::TextContent {
1753                    annotations: None,
1754                    text: "New user message".to_string(),
1755                }),
1756                cx,
1757            );
1758        });
1759
1760        thread.update(cx, |thread, cx| {
1761            assert_eq!(thread.entries.len(), 3);
1762            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1763                assert_eq!(user_msg.id, Some(message_2_id));
1764                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1765            } else {
1766                panic!("Expected UserMessage at index 2");
1767            }
1768        });
1769    }
1770
1771    #[gpui::test]
1772    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1773        init_test(cx);
1774
1775        let fs = FakeFs::new(cx.executor());
1776        let project = Project::test(fs, [], cx).await;
1777        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1778            |_, thread, mut cx| {
1779                async move {
1780                    thread.update(&mut cx, |thread, cx| {
1781                        thread
1782                            .handle_session_update(
1783                                acp::SessionUpdate::AgentThoughtChunk {
1784                                    content: "Thinking ".into(),
1785                                },
1786                                cx,
1787                            )
1788                            .unwrap();
1789                        thread
1790                            .handle_session_update(
1791                                acp::SessionUpdate::AgentThoughtChunk {
1792                                    content: "hard!".into(),
1793                                },
1794                                cx,
1795                            )
1796                            .unwrap();
1797                    })?;
1798                    Ok(acp::PromptResponse {
1799                        stop_reason: acp::StopReason::EndTurn,
1800                    })
1801                }
1802                .boxed_local()
1803            },
1804        ));
1805
1806        let thread = cx
1807            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1808            .await
1809            .unwrap();
1810
1811        thread
1812            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1813            .await
1814            .unwrap();
1815
1816        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1817        assert_eq!(
1818            output,
1819            indoc! {r#"
1820            ## User
1821
1822            Hello from Zed!
1823
1824            ## Assistant
1825
1826            <thinking>
1827            Thinking hard!
1828            </thinking>
1829
1830            "#}
1831        );
1832    }
1833
1834    #[gpui::test]
1835    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1836        init_test(cx);
1837
1838        let fs = FakeFs::new(cx.executor());
1839        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1840            .await;
1841        let project = Project::test(fs.clone(), [], cx).await;
1842        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1843        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1844        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1845            move |_, thread, mut cx| {
1846                let read_file_tx = read_file_tx.clone();
1847                async move {
1848                    let content = thread
1849                        .update(&mut cx, |thread, cx| {
1850                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1851                        })
1852                        .unwrap()
1853                        .await
1854                        .unwrap();
1855                    assert_eq!(content, "one\ntwo\nthree\n");
1856                    read_file_tx.take().unwrap().send(()).unwrap();
1857                    thread
1858                        .update(&mut cx, |thread, cx| {
1859                            thread.write_text_file(
1860                                path!("/tmp/foo").into(),
1861                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1862                                cx,
1863                            )
1864                        })
1865                        .unwrap()
1866                        .await
1867                        .unwrap();
1868                    Ok(acp::PromptResponse {
1869                        stop_reason: acp::StopReason::EndTurn,
1870                    })
1871                }
1872                .boxed_local()
1873            },
1874        ));
1875
1876        let (worktree, pathbuf) = project
1877            .update(cx, |project, cx| {
1878                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1879            })
1880            .await
1881            .unwrap();
1882        let buffer = project
1883            .update(cx, |project, cx| {
1884                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1885            })
1886            .await
1887            .unwrap();
1888
1889        let thread = cx
1890            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1891            .await
1892            .unwrap();
1893
1894        let request = thread.update(cx, |thread, cx| {
1895            thread.send_raw("Extend the count in /tmp/foo", cx)
1896        });
1897        read_file_rx.await.ok();
1898        buffer.update(cx, |buffer, cx| {
1899            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1900        });
1901        cx.run_until_parked();
1902        assert_eq!(
1903            buffer.read_with(cx, |buffer, _| buffer.text()),
1904            "zero\none\ntwo\nthree\nfour\nfive\n"
1905        );
1906        assert_eq!(
1907            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1908            "zero\none\ntwo\nthree\nfour\nfive\n"
1909        );
1910        request.await.unwrap();
1911    }
1912
1913    #[gpui::test]
1914    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1915        init_test(cx);
1916
1917        let fs = FakeFs::new(cx.executor());
1918        let project = Project::test(fs, [], cx).await;
1919        let id = acp::ToolCallId("test".into());
1920
1921        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1922            let id = id.clone();
1923            move |_, thread, mut cx| {
1924                let id = id.clone();
1925                async move {
1926                    thread
1927                        .update(&mut cx, |thread, cx| {
1928                            thread.handle_session_update(
1929                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1930                                    id: id.clone(),
1931                                    title: "Label".into(),
1932                                    kind: acp::ToolKind::Fetch,
1933                                    status: acp::ToolCallStatus::InProgress,
1934                                    content: vec![],
1935                                    locations: vec![],
1936                                    raw_input: None,
1937                                    raw_output: None,
1938                                }),
1939                                cx,
1940                            )
1941                        })
1942                        .unwrap()
1943                        .unwrap();
1944                    Ok(acp::PromptResponse {
1945                        stop_reason: acp::StopReason::EndTurn,
1946                    })
1947                }
1948                .boxed_local()
1949            }
1950        }));
1951
1952        let thread = cx
1953            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1954            .await
1955            .unwrap();
1956
1957        let request = thread.update(cx, |thread, cx| {
1958            thread.send_raw("Fetch https://example.com", cx)
1959        });
1960
1961        run_until_first_tool_call(&thread, cx).await;
1962
1963        thread.read_with(cx, |thread, _| {
1964            assert!(matches!(
1965                thread.entries[1],
1966                AgentThreadEntry::ToolCall(ToolCall {
1967                    status: ToolCallStatus::InProgress,
1968                    ..
1969                })
1970            ));
1971        });
1972
1973        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1974
1975        thread.read_with(cx, |thread, _| {
1976            assert!(matches!(
1977                &thread.entries[1],
1978                AgentThreadEntry::ToolCall(ToolCall {
1979                    status: ToolCallStatus::Canceled,
1980                    ..
1981                })
1982            ));
1983        });
1984
1985        thread
1986            .update(cx, |thread, cx| {
1987                thread.handle_session_update(
1988                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1989                        id,
1990                        fields: acp::ToolCallUpdateFields {
1991                            status: Some(acp::ToolCallStatus::Completed),
1992                            ..Default::default()
1993                        },
1994                    }),
1995                    cx,
1996                )
1997            })
1998            .unwrap();
1999
2000        request.await.unwrap();
2001
2002        thread.read_with(cx, |thread, _| {
2003            assert!(matches!(
2004                thread.entries[1],
2005                AgentThreadEntry::ToolCall(ToolCall {
2006                    status: ToolCallStatus::Completed,
2007                    ..
2008                })
2009            ));
2010        });
2011    }
2012
2013    #[gpui::test]
2014    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2015        init_test(cx);
2016        let fs = FakeFs::new(cx.background_executor.clone());
2017        fs.insert_tree(path!("/test"), json!({})).await;
2018        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2019
2020        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2021            move |_, thread, mut cx| {
2022                async move {
2023                    thread
2024                        .update(&mut cx, |thread, cx| {
2025                            thread.handle_session_update(
2026                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2027                                    id: acp::ToolCallId("test".into()),
2028                                    title: "Label".into(),
2029                                    kind: acp::ToolKind::Edit,
2030                                    status: acp::ToolCallStatus::Completed,
2031                                    content: vec![acp::ToolCallContent::Diff {
2032                                        diff: acp::Diff {
2033                                            path: "/test/test.txt".into(),
2034                                            old_text: None,
2035                                            new_text: "foo".into(),
2036                                        },
2037                                    }],
2038                                    locations: vec![],
2039                                    raw_input: None,
2040                                    raw_output: None,
2041                                }),
2042                                cx,
2043                            )
2044                        })
2045                        .unwrap()
2046                        .unwrap();
2047                    Ok(acp::PromptResponse {
2048                        stop_reason: acp::StopReason::EndTurn,
2049                    })
2050                }
2051                .boxed_local()
2052            }
2053        }));
2054
2055        let thread = cx
2056            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2057            .await
2058            .unwrap();
2059
2060        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2061            .await
2062            .unwrap();
2063
2064        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2065    }
2066
2067    #[gpui::test(iterations = 10)]
2068    async fn test_checkpoints(cx: &mut TestAppContext) {
2069        init_test(cx);
2070        let fs = FakeFs::new(cx.background_executor.clone());
2071        fs.insert_tree(
2072            path!("/test"),
2073            json!({
2074                ".git": {}
2075            }),
2076        )
2077        .await;
2078        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2079
2080        let simulate_changes = Arc::new(AtomicBool::new(true));
2081        let next_filename = Arc::new(AtomicUsize::new(0));
2082        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2083            let simulate_changes = simulate_changes.clone();
2084            let next_filename = next_filename.clone();
2085            let fs = fs.clone();
2086            move |request, thread, mut cx| {
2087                let fs = fs.clone();
2088                let simulate_changes = simulate_changes.clone();
2089                let next_filename = next_filename.clone();
2090                async move {
2091                    if simulate_changes.load(SeqCst) {
2092                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2093                        fs.write(Path::new(&filename), b"").await?;
2094                    }
2095
2096                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2097                        panic!("expected text content block");
2098                    };
2099                    thread.update(&mut cx, |thread, cx| {
2100                        thread
2101                            .handle_session_update(
2102                                acp::SessionUpdate::AgentMessageChunk {
2103                                    content: content.text.to_uppercase().into(),
2104                                },
2105                                cx,
2106                            )
2107                            .unwrap();
2108                    })?;
2109                    Ok(acp::PromptResponse {
2110                        stop_reason: acp::StopReason::EndTurn,
2111                    })
2112                }
2113                .boxed_local()
2114            }
2115        }));
2116        let thread = cx
2117            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2118            .await
2119            .unwrap();
2120
2121        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2122            .await
2123            .unwrap();
2124        thread.read_with(cx, |thread, cx| {
2125            assert_eq!(
2126                thread.to_markdown(cx),
2127                indoc! {"
2128                    ## User (checkpoint)
2129
2130                    Lorem
2131
2132                    ## Assistant
2133
2134                    LOREM
2135
2136                "}
2137            );
2138        });
2139        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2140
2141        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2142            .await
2143            .unwrap();
2144        thread.read_with(cx, |thread, cx| {
2145            assert_eq!(
2146                thread.to_markdown(cx),
2147                indoc! {"
2148                    ## User (checkpoint)
2149
2150                    Lorem
2151
2152                    ## Assistant
2153
2154                    LOREM
2155
2156                    ## User (checkpoint)
2157
2158                    ipsum
2159
2160                    ## Assistant
2161
2162                    IPSUM
2163
2164                "}
2165            );
2166        });
2167        assert_eq!(
2168            fs.files(),
2169            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2170        );
2171
2172        // Checkpoint isn't stored when there are no changes.
2173        simulate_changes.store(false, SeqCst);
2174        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2175            .await
2176            .unwrap();
2177        thread.read_with(cx, |thread, cx| {
2178            assert_eq!(
2179                thread.to_markdown(cx),
2180                indoc! {"
2181                    ## User (checkpoint)
2182
2183                    Lorem
2184
2185                    ## Assistant
2186
2187                    LOREM
2188
2189                    ## User (checkpoint)
2190
2191                    ipsum
2192
2193                    ## Assistant
2194
2195                    IPSUM
2196
2197                    ## User
2198
2199                    dolor
2200
2201                    ## Assistant
2202
2203                    DOLOR
2204
2205                "}
2206            );
2207        });
2208        assert_eq!(
2209            fs.files(),
2210            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2211        );
2212
2213        // Rewinding the conversation truncates the history and restores the checkpoint.
2214        thread
2215            .update(cx, |thread, cx| {
2216                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2217                    panic!("unexpected entries {:?}", thread.entries)
2218                };
2219                thread.rewind(message.id.clone().unwrap(), cx)
2220            })
2221            .await
2222            .unwrap();
2223        thread.read_with(cx, |thread, cx| {
2224            assert_eq!(
2225                thread.to_markdown(cx),
2226                indoc! {"
2227                    ## User (checkpoint)
2228
2229                    Lorem
2230
2231                    ## Assistant
2232
2233                    LOREM
2234
2235                "}
2236            );
2237        });
2238        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2239    }
2240
2241    async fn run_until_first_tool_call(
2242        thread: &Entity<AcpThread>,
2243        cx: &mut TestAppContext,
2244    ) -> usize {
2245        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2246
2247        let subscription = cx.update(|cx| {
2248            cx.subscribe(thread, move |thread, _, cx| {
2249                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2250                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2251                        return tx.try_send(ix).unwrap();
2252                    }
2253                }
2254            })
2255        });
2256
2257        select! {
2258            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2259                panic!("Timeout waiting for tool call")
2260            }
2261            ix = rx.next().fuse() => {
2262                drop(subscription);
2263                ix.unwrap()
2264            }
2265        }
2266    }
2267
2268    #[derive(Clone, Default)]
2269    struct FakeAgentConnection {
2270        auth_methods: Vec<acp::AuthMethod>,
2271        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2272        on_user_message: Option<
2273            Rc<
2274                dyn Fn(
2275                        acp::PromptRequest,
2276                        WeakEntity<AcpThread>,
2277                        AsyncApp,
2278                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2279                    + 'static,
2280            >,
2281        >,
2282    }
2283
2284    impl FakeAgentConnection {
2285        fn new() -> Self {
2286            Self {
2287                auth_methods: Vec::new(),
2288                on_user_message: None,
2289                sessions: Arc::default(),
2290            }
2291        }
2292
2293        #[expect(unused)]
2294        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2295            self.auth_methods = auth_methods;
2296            self
2297        }
2298
2299        fn on_user_message(
2300            mut self,
2301            handler: impl Fn(
2302                acp::PromptRequest,
2303                WeakEntity<AcpThread>,
2304                AsyncApp,
2305            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2306            + 'static,
2307        ) -> Self {
2308            self.on_user_message.replace(Rc::new(handler));
2309            self
2310        }
2311    }
2312
2313    impl AgentConnection for FakeAgentConnection {
2314        fn auth_methods(&self) -> &[acp::AuthMethod] {
2315            &self.auth_methods
2316        }
2317
2318        fn new_thread(
2319            self: Rc<Self>,
2320            project: Entity<Project>,
2321            _cwd: &Path,
2322            cx: &mut App,
2323        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2324            let session_id = acp::SessionId(
2325                rand::thread_rng()
2326                    .sample_iter(&rand::distributions::Alphanumeric)
2327                    .take(7)
2328                    .map(char::from)
2329                    .collect::<String>()
2330                    .into(),
2331            );
2332            let thread =
2333                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2334            self.sessions.lock().insert(session_id, thread.downgrade());
2335            Task::ready(Ok(thread))
2336        }
2337
2338        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2339            if self.auth_methods().iter().any(|m| m.id == method) {
2340                Task::ready(Ok(()))
2341            } else {
2342                Task::ready(Err(anyhow!("Invalid Auth Method")))
2343            }
2344        }
2345
2346        fn prompt(
2347            &self,
2348            _id: Option<UserMessageId>,
2349            params: acp::PromptRequest,
2350            cx: &mut App,
2351        ) -> Task<gpui::Result<acp::PromptResponse>> {
2352            let sessions = self.sessions.lock();
2353            let thread = sessions.get(&params.session_id).unwrap();
2354            if let Some(handler) = &self.on_user_message {
2355                let handler = handler.clone();
2356                let thread = thread.clone();
2357                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2358            } else {
2359                Task::ready(Ok(acp::PromptResponse {
2360                    stop_reason: acp::StopReason::EndTurn,
2361                }))
2362            }
2363        }
2364
2365        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2366            let sessions = self.sessions.lock();
2367            let thread = sessions.get(&session_id).unwrap().clone();
2368
2369            cx.spawn(async move |cx| {
2370                thread
2371                    .update(cx, |thread, cx| thread.cancel(cx))
2372                    .unwrap()
2373                    .await
2374            })
2375            .detach();
2376        }
2377
2378        fn session_editor(
2379            &self,
2380            session_id: &acp::SessionId,
2381            _cx: &mut App,
2382        ) -> Option<Rc<dyn AgentSessionEditor>> {
2383            Some(Rc::new(FakeAgentSessionEditor {
2384                _session_id: session_id.clone(),
2385            }))
2386        }
2387
2388        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2389            self
2390        }
2391    }
2392
2393    struct FakeAgentSessionEditor {
2394        _session_id: acp::SessionId,
2395    }
2396
2397    impl AgentSessionEditor for FakeAgentSessionEditor {
2398        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2399            Task::ready(Ok(()))
2400        }
2401    }
2402}