acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9use serde::{Deserialize, Serialize};
  10pub use terminal::*;
  11
  12use action_log::ActionLog;
  13use agent_client_protocol as acp;
  14use anyhow::{Context as _, Result, anyhow};
  15use chrono::{DateTime, Utc};
  16use editor::Bias;
  17use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  18use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  19use itertools::Itertools;
  20use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  21use markdown::Markdown;
  22use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  23use std::collections::HashMap;
  24use std::error::Error;
  25use std::fmt::{Formatter, Write};
  26use std::ops::Range;
  27use std::process::ExitStatus;
  28use std::rc::Rc;
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use ui::App;
  31use util::ResultExt;
  32
  33#[derive(Debug)]
  34pub struct UserMessage {
  35    pub id: Option<UserMessageId>,
  36    pub content: ContentBlock,
  37    pub chunks: Vec<acp::ContentBlock>,
  38    pub checkpoint: Option<Checkpoint>,
  39}
  40
  41#[derive(Debug)]
  42pub struct Checkpoint {
  43    git_checkpoint: GitStoreCheckpoint,
  44    pub show: bool,
  45}
  46
  47impl UserMessage {
  48    fn to_markdown(&self, cx: &App) -> String {
  49        let mut markdown = String::new();
  50        if self
  51            .checkpoint
  52            .as_ref()
  53            .map_or(false, |checkpoint| checkpoint.show)
  54        {
  55            writeln!(markdown, "## User (checkpoint)").unwrap();
  56        } else {
  57            writeln!(markdown, "## User").unwrap();
  58        }
  59        writeln!(markdown).unwrap();
  60        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  61        writeln!(markdown).unwrap();
  62        markdown
  63    }
  64}
  65
  66#[derive(Debug, PartialEq)]
  67pub struct AssistantMessage {
  68    pub chunks: Vec<AssistantMessageChunk>,
  69}
  70
  71impl AssistantMessage {
  72    pub fn to_markdown(&self, cx: &App) -> String {
  73        format!(
  74            "## Assistant\n\n{}\n\n",
  75            self.chunks
  76                .iter()
  77                .map(|chunk| chunk.to_markdown(cx))
  78                .join("\n\n")
  79        )
  80    }
  81}
  82
  83#[derive(Debug, PartialEq)]
  84pub enum AssistantMessageChunk {
  85    Message { block: ContentBlock },
  86    Thought { block: ContentBlock },
  87}
  88
  89impl AssistantMessageChunk {
  90    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  91        Self::Message {
  92            block: ContentBlock::new(chunk.into(), language_registry, cx),
  93        }
  94    }
  95
  96    fn to_markdown(&self, cx: &App) -> String {
  97        match self {
  98            Self::Message { block } => block.to_markdown(cx).to_string(),
  99            Self::Thought { block } => {
 100                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 101            }
 102        }
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub enum AgentThreadEntry {
 108    UserMessage(UserMessage),
 109    AssistantMessage(AssistantMessage),
 110    ToolCall(ToolCall),
 111}
 112
 113impl AgentThreadEntry {
 114    pub fn to_markdown(&self, cx: &App) -> String {
 115        match self {
 116            Self::UserMessage(message) => message.to_markdown(cx),
 117            Self::AssistantMessage(message) => message.to_markdown(cx),
 118            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 119        }
 120    }
 121
 122    pub fn user_message(&self) -> Option<&UserMessage> {
 123        if let AgentThreadEntry::UserMessage(message) = self {
 124            Some(message)
 125        } else {
 126            None
 127        }
 128    }
 129
 130    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 131        if let AgentThreadEntry::ToolCall(call) = self {
 132            itertools::Either::Left(call.diffs())
 133        } else {
 134            itertools::Either::Right(std::iter::empty())
 135        }
 136    }
 137
 138    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 139        if let AgentThreadEntry::ToolCall(call) = self {
 140            itertools::Either::Left(call.terminals())
 141        } else {
 142            itertools::Either::Right(std::iter::empty())
 143        }
 144    }
 145
 146    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 147        if let AgentThreadEntry::ToolCall(ToolCall {
 148            locations,
 149            resolved_locations,
 150            ..
 151        }) = self
 152        {
 153            Some((
 154                locations.get(ix)?.clone(),
 155                resolved_locations.get(ix)?.clone()?,
 156            ))
 157        } else {
 158            None
 159        }
 160    }
 161}
 162
 163#[derive(Debug)]
 164pub struct ToolCall {
 165    pub id: acp::ToolCallId,
 166    pub label: Entity<Markdown>,
 167    pub kind: acp::ToolKind,
 168    pub content: Vec<ToolCallContent>,
 169    pub status: ToolCallStatus,
 170    pub locations: Vec<acp::ToolCallLocation>,
 171    pub resolved_locations: Vec<Option<AgentLocation>>,
 172    pub raw_input: Option<serde_json::Value>,
 173    pub raw_output: Option<serde_json::Value>,
 174}
 175
 176impl ToolCall {
 177    fn from_acp(
 178        tool_call: acp::ToolCall,
 179        status: ToolCallStatus,
 180        language_registry: Arc<LanguageRegistry>,
 181        cx: &mut App,
 182    ) -> Self {
 183        Self {
 184            id: tool_call.id,
 185            label: cx.new(|cx| {
 186                Markdown::new(
 187                    tool_call.title.into(),
 188                    Some(language_registry.clone()),
 189                    None,
 190                    cx,
 191                )
 192            }),
 193            kind: tool_call.kind,
 194            content: tool_call
 195                .content
 196                .into_iter()
 197                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 198                .collect(),
 199            locations: tool_call.locations,
 200            resolved_locations: Vec::default(),
 201            status,
 202            raw_input: tool_call.raw_input,
 203            raw_output: tool_call.raw_output,
 204        }
 205    }
 206
 207    fn update_fields(
 208        &mut self,
 209        fields: acp::ToolCallUpdateFields,
 210        language_registry: Arc<LanguageRegistry>,
 211        cx: &mut App,
 212    ) {
 213        let acp::ToolCallUpdateFields {
 214            kind,
 215            status,
 216            title,
 217            content,
 218            locations,
 219            raw_input,
 220            raw_output,
 221        } = fields;
 222
 223        if let Some(kind) = kind {
 224            self.kind = kind;
 225        }
 226
 227        if let Some(status) = status {
 228            self.status = status.into();
 229        }
 230
 231        if let Some(title) = title {
 232            self.label.update(cx, |label, cx| {
 233                label.replace(title, cx);
 234            });
 235        }
 236
 237        if let Some(content) = content {
 238            self.content = content
 239                .into_iter()
 240                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 241                .collect();
 242        }
 243
 244        if let Some(locations) = locations {
 245            self.locations = locations;
 246        }
 247
 248        if let Some(raw_input) = raw_input {
 249            self.raw_input = Some(raw_input);
 250        }
 251
 252        if let Some(raw_output) = raw_output {
 253            if self.content.is_empty() {
 254                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 255                {
 256                    self.content
 257                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 258                            markdown,
 259                        }));
 260                }
 261            }
 262            self.raw_output = Some(raw_output);
 263        }
 264    }
 265
 266    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 267        self.content.iter().filter_map(|content| match content {
 268            ToolCallContent::Diff(diff) => Some(diff),
 269            ToolCallContent::ContentBlock(_) => None,
 270            ToolCallContent::Terminal(_) => None,
 271        })
 272    }
 273
 274    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 275        self.content.iter().filter_map(|content| match content {
 276            ToolCallContent::Terminal(terminal) => Some(terminal),
 277            ToolCallContent::ContentBlock(_) => None,
 278            ToolCallContent::Diff(_) => None,
 279        })
 280    }
 281
 282    fn to_markdown(&self, cx: &App) -> String {
 283        let mut markdown = format!(
 284            "**Tool Call: {}**\nStatus: {}\n\n",
 285            self.label.read(cx).source(),
 286            self.status
 287        );
 288        for content in &self.content {
 289            markdown.push_str(content.to_markdown(cx).as_str());
 290            markdown.push_str("\n\n");
 291        }
 292        markdown
 293    }
 294
 295    async fn resolve_location(
 296        location: acp::ToolCallLocation,
 297        project: WeakEntity<Project>,
 298        cx: &mut AsyncApp,
 299    ) -> Option<AgentLocation> {
 300        let buffer = project
 301            .update(cx, |project, cx| {
 302                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 303                    Some(project.open_buffer(path, cx))
 304                } else {
 305                    None
 306                }
 307            })
 308            .ok()??;
 309        let buffer = buffer.await.log_err()?;
 310        let position = buffer
 311            .update(cx, |buffer, _| {
 312                if let Some(row) = location.line {
 313                    let snapshot = buffer.snapshot();
 314                    let column = snapshot.indent_size_for_line(row).len;
 315                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 316                    snapshot.anchor_before(point)
 317                } else {
 318                    Anchor::MIN
 319                }
 320            })
 321            .ok()?;
 322
 323        Some(AgentLocation {
 324            buffer: buffer.downgrade(),
 325            position,
 326        })
 327    }
 328
 329    fn resolve_locations(
 330        &self,
 331        project: Entity<Project>,
 332        cx: &mut App,
 333    ) -> Task<Vec<Option<AgentLocation>>> {
 334        let locations = self.locations.clone();
 335        project.update(cx, |_, cx| {
 336            cx.spawn(async move |project, cx| {
 337                let mut new_locations = Vec::new();
 338                for location in locations {
 339                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 340                }
 341                new_locations
 342            })
 343        })
 344    }
 345}
 346
 347#[derive(Debug)]
 348pub enum ToolCallStatus {
 349    /// The tool call hasn't started running yet, but we start showing it to
 350    /// the user.
 351    Pending,
 352    /// The tool call is waiting for confirmation from the user.
 353    WaitingForConfirmation {
 354        options: Vec<acp::PermissionOption>,
 355        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 356    },
 357    /// The tool call is currently running.
 358    InProgress,
 359    /// The tool call completed successfully.
 360    Completed,
 361    /// The tool call failed.
 362    Failed,
 363    /// The user rejected the tool call.
 364    Rejected,
 365    /// The user canceled generation so the tool call was canceled.
 366    Canceled,
 367}
 368
 369impl From<acp::ToolCallStatus> for ToolCallStatus {
 370    fn from(status: acp::ToolCallStatus) -> Self {
 371        match status {
 372            acp::ToolCallStatus::Pending => Self::Pending,
 373            acp::ToolCallStatus::InProgress => Self::InProgress,
 374            acp::ToolCallStatus::Completed => Self::Completed,
 375            acp::ToolCallStatus::Failed => Self::Failed,
 376        }
 377    }
 378}
 379
 380impl Display for ToolCallStatus {
 381    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 382        write!(
 383            f,
 384            "{}",
 385            match self {
 386                ToolCallStatus::Pending => "Pending",
 387                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 388                ToolCallStatus::InProgress => "In Progress",
 389                ToolCallStatus::Completed => "Completed",
 390                ToolCallStatus::Failed => "Failed",
 391                ToolCallStatus::Rejected => "Rejected",
 392                ToolCallStatus::Canceled => "Canceled",
 393            }
 394        )
 395    }
 396}
 397
 398#[derive(Debug, PartialEq, Clone)]
 399pub enum ContentBlock {
 400    Empty,
 401    Markdown { markdown: Entity<Markdown> },
 402    ResourceLink { resource_link: acp::ResourceLink },
 403}
 404
 405impl ContentBlock {
 406    pub fn new(
 407        block: acp::ContentBlock,
 408        language_registry: &Arc<LanguageRegistry>,
 409        cx: &mut App,
 410    ) -> Self {
 411        let mut this = Self::Empty;
 412        this.append(block, language_registry, cx);
 413        this
 414    }
 415
 416    pub fn new_combined(
 417        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 418        language_registry: Arc<LanguageRegistry>,
 419        cx: &mut App,
 420    ) -> Self {
 421        let mut this = Self::Empty;
 422        for block in blocks {
 423            this.append(block, &language_registry, cx);
 424        }
 425        this
 426    }
 427
 428    pub fn append(
 429        &mut self,
 430        block: acp::ContentBlock,
 431        language_registry: &Arc<LanguageRegistry>,
 432        cx: &mut App,
 433    ) {
 434        if matches!(self, ContentBlock::Empty) {
 435            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 436                *self = ContentBlock::ResourceLink { resource_link };
 437                return;
 438            }
 439        }
 440
 441        let new_content = self.block_string_contents(block);
 442
 443        match self {
 444            ContentBlock::Empty => {
 445                *self = Self::create_markdown_block(new_content, language_registry, cx);
 446            }
 447            ContentBlock::Markdown { markdown } => {
 448                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 449            }
 450            ContentBlock::ResourceLink { resource_link } => {
 451                let existing_content = Self::resource_link_md(&resource_link.uri);
 452                let combined = format!("{}\n{}", existing_content, new_content);
 453
 454                *self = Self::create_markdown_block(combined, language_registry, cx);
 455            }
 456        }
 457    }
 458
 459    fn create_markdown_block(
 460        content: String,
 461        language_registry: &Arc<LanguageRegistry>,
 462        cx: &mut App,
 463    ) -> ContentBlock {
 464        ContentBlock::Markdown {
 465            markdown: cx
 466                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 467        }
 468    }
 469
 470    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 471        match block {
 472            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 473            acp::ContentBlock::ResourceLink(resource_link) => {
 474                Self::resource_link_md(&resource_link.uri)
 475            }
 476            acp::ContentBlock::Resource(acp::EmbeddedResource {
 477                resource:
 478                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 479                        uri,
 480                        ..
 481                    }),
 482                ..
 483            }) => Self::resource_link_md(&uri),
 484            acp::ContentBlock::Image(image) => Self::image_md(&image),
 485            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 486        }
 487    }
 488
 489    fn resource_link_md(uri: &str) -> String {
 490        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 491            uri.as_link().to_string()
 492        } else {
 493            uri.to_string()
 494        }
 495    }
 496
 497    fn image_md(_image: &acp::ImageContent) -> String {
 498        "`Image`".into()
 499    }
 500
 501    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 502        match self {
 503            ContentBlock::Empty => "",
 504            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 505            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 506        }
 507    }
 508
 509    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 510        match self {
 511            ContentBlock::Empty => None,
 512            ContentBlock::Markdown { markdown } => Some(markdown),
 513            ContentBlock::ResourceLink { .. } => None,
 514        }
 515    }
 516
 517    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 518        match self {
 519            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 520            _ => None,
 521        }
 522    }
 523}
 524
 525#[derive(Debug)]
 526pub enum ToolCallContent {
 527    ContentBlock(ContentBlock),
 528    Diff(Entity<Diff>),
 529    Terminal(Entity<Terminal>),
 530}
 531
 532impl ToolCallContent {
 533    pub fn from_acp(
 534        content: acp::ToolCallContent,
 535        language_registry: Arc<LanguageRegistry>,
 536        cx: &mut App,
 537    ) -> Self {
 538        match content {
 539            acp::ToolCallContent::Content { content } => {
 540                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 541            }
 542            acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
 543                Diff::finalized(
 544                    diff.path,
 545                    diff.old_text,
 546                    diff.new_text,
 547                    language_registry,
 548                    cx,
 549                )
 550            })),
 551        }
 552    }
 553
 554    pub fn to_markdown(&self, cx: &App) -> String {
 555        match self {
 556            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 557            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 558            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 559        }
 560    }
 561}
 562
 563#[derive(Debug, PartialEq)]
 564pub enum ToolCallUpdate {
 565    UpdateFields(acp::ToolCallUpdate),
 566    UpdateDiff(ToolCallUpdateDiff),
 567    UpdateTerminal(ToolCallUpdateTerminal),
 568}
 569
 570impl ToolCallUpdate {
 571    fn id(&self) -> &acp::ToolCallId {
 572        match self {
 573            Self::UpdateFields(update) => &update.id,
 574            Self::UpdateDiff(diff) => &diff.id,
 575            Self::UpdateTerminal(terminal) => &terminal.id,
 576        }
 577    }
 578}
 579
 580impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 581    fn from(update: acp::ToolCallUpdate) -> Self {
 582        Self::UpdateFields(update)
 583    }
 584}
 585
 586impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 587    fn from(diff: ToolCallUpdateDiff) -> Self {
 588        Self::UpdateDiff(diff)
 589    }
 590}
 591
 592#[derive(Debug, PartialEq)]
 593pub struct ToolCallUpdateDiff {
 594    pub id: acp::ToolCallId,
 595    pub diff: Entity<Diff>,
 596}
 597
 598impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 599    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 600        Self::UpdateTerminal(terminal)
 601    }
 602}
 603
 604#[derive(Debug, PartialEq)]
 605pub struct ToolCallUpdateTerminal {
 606    pub id: acp::ToolCallId,
 607    pub terminal: Entity<Terminal>,
 608}
 609
 610#[derive(Debug, Default)]
 611pub struct Plan {
 612    pub entries: Vec<PlanEntry>,
 613}
 614
 615#[derive(Debug)]
 616pub struct PlanStats<'a> {
 617    pub in_progress_entry: Option<&'a PlanEntry>,
 618    pub pending: u32,
 619    pub completed: u32,
 620}
 621
 622impl Plan {
 623    pub fn is_empty(&self) -> bool {
 624        self.entries.is_empty()
 625    }
 626
 627    pub fn stats(&self) -> PlanStats<'_> {
 628        let mut stats = PlanStats {
 629            in_progress_entry: None,
 630            pending: 0,
 631            completed: 0,
 632        };
 633
 634        for entry in &self.entries {
 635            match &entry.status {
 636                acp::PlanEntryStatus::Pending => {
 637                    stats.pending += 1;
 638                }
 639                acp::PlanEntryStatus::InProgress => {
 640                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 641                }
 642                acp::PlanEntryStatus::Completed => {
 643                    stats.completed += 1;
 644                }
 645            }
 646        }
 647
 648        stats
 649    }
 650}
 651
 652#[derive(Debug)]
 653pub struct PlanEntry {
 654    pub content: Entity<Markdown>,
 655    pub priority: acp::PlanEntryPriority,
 656    pub status: acp::PlanEntryStatus,
 657}
 658
 659impl PlanEntry {
 660    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 661        Self {
 662            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 663            priority: entry.priority,
 664            status: entry.status,
 665        }
 666    }
 667}
 668
 669#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
 670pub struct AgentServerName(pub SharedString);
 671
 672#[derive(Debug, Clone, Serialize, Deserialize)]
 673pub struct AcpThreadMetadata {
 674    pub agent: AgentServerName,
 675    pub id: acp::SessionId,
 676    pub title: SharedString,
 677    pub updated_at: DateTime<Utc>,
 678}
 679
 680pub struct AcpThread {
 681    title: SharedString,
 682    entries: Vec<AgentThreadEntry>,
 683    plan: Plan,
 684    project: Entity<Project>,
 685    action_log: Entity<ActionLog>,
 686    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 687    send_task: Option<Task<()>>,
 688    connection: Rc<dyn AgentConnection>,
 689    session_id: acp::SessionId,
 690}
 691
 692pub enum AcpThreadEvent {
 693    NewEntry,
 694    TitleUpdated,
 695    EntryUpdated(usize),
 696    EntriesRemoved(Range<usize>),
 697    ToolAuthorizationRequired,
 698    Stopped,
 699    Error,
 700    ServerExited(ExitStatus),
 701}
 702
 703impl EventEmitter<AcpThreadEvent> for AcpThread {}
 704
 705#[derive(PartialEq, Eq)]
 706pub enum ThreadStatus {
 707    Idle,
 708    WaitingForToolConfirmation,
 709    Generating,
 710}
 711
 712#[derive(Debug, Clone)]
 713pub enum LoadError {
 714    Unsupported {
 715        error_message: SharedString,
 716        upgrade_message: SharedString,
 717        upgrade_command: String,
 718    },
 719    Exited(i32),
 720    Other(SharedString),
 721}
 722
 723impl Display for LoadError {
 724    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 725        match self {
 726            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 727            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 728            LoadError::Other(msg) => write!(f, "{}", msg),
 729        }
 730    }
 731}
 732
 733impl Error for LoadError {}
 734
 735impl AcpThread {
 736    pub fn new(
 737        title: impl Into<SharedString>,
 738        connection: Rc<dyn AgentConnection>,
 739        project: Entity<Project>,
 740        session_id: acp::SessionId,
 741        cx: &mut Context<Self>,
 742    ) -> Self {
 743        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 744
 745        Self {
 746            action_log,
 747            shared_buffers: Default::default(),
 748            entries: Default::default(),
 749            plan: Default::default(),
 750            title: title.into(),
 751            project,
 752            send_task: None,
 753            connection,
 754            session_id,
 755        }
 756    }
 757
 758    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 759        &self.connection
 760    }
 761
 762    pub fn action_log(&self) -> &Entity<ActionLog> {
 763        &self.action_log
 764    }
 765
 766    pub fn project(&self) -> &Entity<Project> {
 767        &self.project
 768    }
 769
 770    pub fn title(&self) -> SharedString {
 771        self.title.clone()
 772    }
 773
 774    pub fn entries(&self) -> &[AgentThreadEntry] {
 775        &self.entries
 776    }
 777
 778    pub fn session_id(&self) -> &acp::SessionId {
 779        &self.session_id
 780    }
 781
 782    pub fn status(&self) -> ThreadStatus {
 783        if self.send_task.is_some() {
 784            if self.waiting_for_tool_confirmation() {
 785                ThreadStatus::WaitingForToolConfirmation
 786            } else {
 787                ThreadStatus::Generating
 788            }
 789        } else {
 790            ThreadStatus::Idle
 791        }
 792    }
 793
 794    pub fn has_pending_edit_tool_calls(&self) -> bool {
 795        for entry in self.entries.iter().rev() {
 796            match entry {
 797                AgentThreadEntry::UserMessage(_) => return false,
 798                AgentThreadEntry::ToolCall(
 799                    call @ ToolCall {
 800                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
 801                        ..
 802                    },
 803                ) if call.diffs().next().is_some() => {
 804                    return true;
 805                }
 806                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 807            }
 808        }
 809
 810        false
 811    }
 812
 813    pub fn used_tools_since_last_user_message(&self) -> bool {
 814        for entry in self.entries.iter().rev() {
 815            match entry {
 816                AgentThreadEntry::UserMessage(..) => return false,
 817                AgentThreadEntry::AssistantMessage(..) => continue,
 818                AgentThreadEntry::ToolCall(..) => return true,
 819            }
 820        }
 821
 822        false
 823    }
 824
 825    pub fn handle_session_update(
 826        &mut self,
 827        update: acp::SessionUpdate,
 828        cx: &mut Context<Self>,
 829    ) -> Result<(), acp::Error> {
 830        match update {
 831            acp::SessionUpdate::UserMessageChunk { content } => {
 832                self.push_user_content_block(None, content, cx);
 833            }
 834            acp::SessionUpdate::AgentMessageChunk { content } => {
 835                self.push_assistant_content_block(content, false, cx);
 836            }
 837            acp::SessionUpdate::AgentThoughtChunk { content } => {
 838                self.push_assistant_content_block(content, true, cx);
 839            }
 840            acp::SessionUpdate::ToolCall(tool_call) => {
 841                self.upsert_tool_call(tool_call, cx)?;
 842            }
 843            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 844                self.update_tool_call(tool_call_update, cx)?;
 845            }
 846            acp::SessionUpdate::Plan(plan) => {
 847                self.update_plan(plan, cx);
 848            }
 849        }
 850        Ok(())
 851    }
 852
 853    pub fn push_user_content_block(
 854        &mut self,
 855        message_id: Option<UserMessageId>,
 856        chunk: acp::ContentBlock,
 857        cx: &mut Context<Self>,
 858    ) {
 859        let language_registry = self.project.read(cx).languages().clone();
 860        let entries_len = self.entries.len();
 861
 862        if let Some(last_entry) = self.entries.last_mut()
 863            && let AgentThreadEntry::UserMessage(UserMessage {
 864                id,
 865                content,
 866                chunks,
 867                ..
 868            }) = last_entry
 869        {
 870            *id = message_id.or(id.take());
 871            content.append(chunk.clone(), &language_registry, cx);
 872            chunks.push(chunk);
 873            let idx = entries_len - 1;
 874            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 875        } else {
 876            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 877            self.push_entry(
 878                AgentThreadEntry::UserMessage(UserMessage {
 879                    id: message_id,
 880                    content,
 881                    chunks: vec![chunk],
 882                    checkpoint: None,
 883                }),
 884                cx,
 885            );
 886        }
 887    }
 888
 889    pub fn push_assistant_content_block(
 890        &mut self,
 891        chunk: acp::ContentBlock,
 892        is_thought: bool,
 893        cx: &mut Context<Self>,
 894    ) {
 895        let language_registry = self.project.read(cx).languages().clone();
 896        let entries_len = self.entries.len();
 897        if let Some(last_entry) = self.entries.last_mut()
 898            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 899        {
 900            let idx = entries_len - 1;
 901            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 902            match (chunks.last_mut(), is_thought) {
 903                (Some(AssistantMessageChunk::Message { block }), false)
 904                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 905                    block.append(chunk, &language_registry, cx)
 906                }
 907                _ => {
 908                    let block = ContentBlock::new(chunk, &language_registry, cx);
 909                    if is_thought {
 910                        chunks.push(AssistantMessageChunk::Thought { block })
 911                    } else {
 912                        chunks.push(AssistantMessageChunk::Message { block })
 913                    }
 914                }
 915            }
 916        } else {
 917            let block = ContentBlock::new(chunk, &language_registry, cx);
 918            let chunk = if is_thought {
 919                AssistantMessageChunk::Thought { block }
 920            } else {
 921                AssistantMessageChunk::Message { block }
 922            };
 923
 924            self.push_entry(
 925                AgentThreadEntry::AssistantMessage(AssistantMessage {
 926                    chunks: vec![chunk],
 927                }),
 928                cx,
 929            );
 930        }
 931    }
 932
 933    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 934        self.entries.push(entry);
 935        cx.emit(AcpThreadEvent::NewEntry);
 936    }
 937
 938    pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
 939        self.title = title;
 940        cx.emit(AcpThreadEvent::TitleUpdated);
 941        Ok(())
 942    }
 943
 944    pub fn update_tool_call(
 945        &mut self,
 946        update: impl Into<ToolCallUpdate>,
 947        cx: &mut Context<Self>,
 948    ) -> Result<()> {
 949        let update = update.into();
 950        let languages = self.project.read(cx).languages().clone();
 951
 952        let (ix, current_call) = self
 953            .tool_call_mut(update.id())
 954            .context("Tool call not found")?;
 955        match update {
 956            ToolCallUpdate::UpdateFields(update) => {
 957                let location_updated = update.fields.locations.is_some();
 958                current_call.update_fields(update.fields, languages, cx);
 959                if location_updated {
 960                    self.resolve_locations(update.id.clone(), cx);
 961                }
 962            }
 963            ToolCallUpdate::UpdateDiff(update) => {
 964                current_call.content.clear();
 965                current_call
 966                    .content
 967                    .push(ToolCallContent::Diff(update.diff));
 968            }
 969            ToolCallUpdate::UpdateTerminal(update) => {
 970                current_call.content.clear();
 971                current_call
 972                    .content
 973                    .push(ToolCallContent::Terminal(update.terminal));
 974            }
 975        }
 976
 977        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 978
 979        Ok(())
 980    }
 981
 982    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 983    pub fn upsert_tool_call(
 984        &mut self,
 985        tool_call: acp::ToolCall,
 986        cx: &mut Context<Self>,
 987    ) -> Result<(), acp::Error> {
 988        let status = tool_call.status.into();
 989        self.upsert_tool_call_inner(tool_call.into(), status, cx)
 990    }
 991
 992    /// Fails if id does not match an existing entry.
 993    pub fn upsert_tool_call_inner(
 994        &mut self,
 995        tool_call_update: acp::ToolCallUpdate,
 996        status: ToolCallStatus,
 997        cx: &mut Context<Self>,
 998    ) -> Result<(), acp::Error> {
 999        let language_registry = self.project.read(cx).languages().clone();
1000        let id = tool_call_update.id.clone();
1001
1002        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1003            current_call.update_fields(tool_call_update.fields, language_registry, cx);
1004            current_call.status = status;
1005
1006            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1007        } else {
1008            let call =
1009                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1010            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1011        };
1012
1013        self.resolve_locations(id, cx);
1014        Ok(())
1015    }
1016
1017    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1018        // The tool call we are looking for is typically the last one, or very close to the end.
1019        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1020        self.entries
1021            .iter_mut()
1022            .enumerate()
1023            .rev()
1024            .find_map(|(index, tool_call)| {
1025                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1026                    && &tool_call.id == id
1027                {
1028                    Some((index, tool_call))
1029                } else {
1030                    None
1031                }
1032            })
1033    }
1034
1035    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1036        let project = self.project.clone();
1037        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1038            return;
1039        };
1040        let task = tool_call.resolve_locations(project, cx);
1041        cx.spawn(async move |this, cx| {
1042            let resolved_locations = task.await;
1043            this.update(cx, |this, cx| {
1044                let project = this.project.clone();
1045                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1046                    return;
1047                };
1048                if let Some(Some(location)) = resolved_locations.last() {
1049                    project.update(cx, |project, cx| {
1050                        if let Some(agent_location) = project.agent_location() {
1051                            let should_ignore = agent_location.buffer == location.buffer
1052                                && location
1053                                    .buffer
1054                                    .update(cx, |buffer, _| {
1055                                        let snapshot = buffer.snapshot();
1056                                        let old_position =
1057                                            agent_location.position.to_point(&snapshot);
1058                                        let new_position = location.position.to_point(&snapshot);
1059                                        // ignore this so that when we get updates from the edit tool
1060                                        // the position doesn't reset to the startof line
1061                                        old_position.row == new_position.row
1062                                            && old_position.column > new_position.column
1063                                    })
1064                                    .ok()
1065                                    .unwrap_or_default();
1066                            if !should_ignore {
1067                                project.set_agent_location(Some(location.clone()), cx);
1068                            }
1069                        }
1070                    });
1071                }
1072                if tool_call.resolved_locations != resolved_locations {
1073                    tool_call.resolved_locations = resolved_locations;
1074                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1075                }
1076            })
1077        })
1078        .detach();
1079    }
1080
1081    pub fn request_tool_call_authorization(
1082        &mut self,
1083        tool_call: acp::ToolCallUpdate,
1084        options: Vec<acp::PermissionOption>,
1085        cx: &mut Context<Self>,
1086    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1087        let (tx, rx) = oneshot::channel();
1088
1089        let status = ToolCallStatus::WaitingForConfirmation {
1090            options,
1091            respond_tx: tx,
1092        };
1093
1094        self.upsert_tool_call_inner(tool_call, status, cx)?;
1095        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1096        Ok(rx)
1097    }
1098
1099    pub fn authorize_tool_call(
1100        &mut self,
1101        id: acp::ToolCallId,
1102        option_id: acp::PermissionOptionId,
1103        option_kind: acp::PermissionOptionKind,
1104        cx: &mut Context<Self>,
1105    ) {
1106        let Some((ix, call)) = self.tool_call_mut(&id) else {
1107            return;
1108        };
1109
1110        let new_status = match option_kind {
1111            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1112                ToolCallStatus::Rejected
1113            }
1114            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1115                ToolCallStatus::InProgress
1116            }
1117        };
1118
1119        let curr_status = mem::replace(&mut call.status, new_status);
1120
1121        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1122            respond_tx.send(option_id).log_err();
1123        } else if cfg!(debug_assertions) {
1124            panic!("tried to authorize an already authorized tool call");
1125        }
1126
1127        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1128    }
1129
1130    /// Returns true if the last turn is awaiting tool authorization
1131    pub fn waiting_for_tool_confirmation(&self) -> bool {
1132        for entry in self.entries.iter().rev() {
1133            match &entry {
1134                AgentThreadEntry::ToolCall(call) => match call.status {
1135                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1136                    ToolCallStatus::Pending
1137                    | ToolCallStatus::InProgress
1138                    | ToolCallStatus::Completed
1139                    | ToolCallStatus::Failed
1140                    | ToolCallStatus::Rejected
1141                    | ToolCallStatus::Canceled => continue,
1142                },
1143                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1144                    // Reached the beginning of the turn
1145                    return false;
1146                }
1147            }
1148        }
1149        false
1150    }
1151
1152    pub fn plan(&self) -> &Plan {
1153        &self.plan
1154    }
1155
1156    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1157        let new_entries_len = request.entries.len();
1158        let mut new_entries = request.entries.into_iter();
1159
1160        // Reuse existing markdown to prevent flickering
1161        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1162            let PlanEntry {
1163                content,
1164                priority,
1165                status,
1166            } = old;
1167            content.update(cx, |old, cx| {
1168                old.replace(new.content, cx);
1169            });
1170            *priority = new.priority;
1171            *status = new.status;
1172        }
1173        for new in new_entries {
1174            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1175        }
1176        self.plan.entries.truncate(new_entries_len);
1177
1178        cx.notify();
1179    }
1180
1181    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1182        self.plan
1183            .entries
1184            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1185        cx.notify();
1186    }
1187
1188    #[cfg(any(test, feature = "test-support"))]
1189    pub fn send_raw(
1190        &mut self,
1191        message: &str,
1192        cx: &mut Context<Self>,
1193    ) -> BoxFuture<'static, Result<()>> {
1194        self.send(
1195            vec![acp::ContentBlock::Text(acp::TextContent {
1196                text: message.to_string(),
1197                annotations: None,
1198            })],
1199            cx,
1200        )
1201    }
1202
1203    pub fn send(
1204        &mut self,
1205        message: Vec<acp::ContentBlock>,
1206        cx: &mut Context<Self>,
1207    ) -> BoxFuture<'static, Result<()>> {
1208        let block = ContentBlock::new_combined(
1209            message.clone(),
1210            self.project.read(cx).languages().clone(),
1211            cx,
1212        );
1213        let request = acp::PromptRequest {
1214            prompt: message.clone(),
1215            session_id: self.session_id.clone(),
1216        };
1217        let git_store = self.project.read(cx).git_store().clone();
1218
1219        let message_id = if self
1220            .connection
1221            .session_editor(&self.session_id, cx)
1222            .is_some()
1223        {
1224            Some(UserMessageId::new())
1225        } else {
1226            None
1227        };
1228        self.push_entry(
1229            AgentThreadEntry::UserMessage(UserMessage {
1230                id: message_id.clone(),
1231                content: block,
1232                chunks: message,
1233                checkpoint: None,
1234            }),
1235            cx,
1236        );
1237
1238        self.run_turn(cx, async move |this, cx| {
1239            let old_checkpoint = git_store
1240                .update(cx, |git, cx| git.checkpoint(cx))?
1241                .await
1242                .context("failed to get old checkpoint")
1243                .log_err();
1244            this.update(cx, |this, cx| {
1245                if let Some((_ix, message)) = this.last_user_message() {
1246                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1247                        git_checkpoint,
1248                        show: false,
1249                    });
1250                }
1251                this.connection.prompt(message_id, request, cx)
1252            })?
1253            .await
1254        })
1255    }
1256
1257    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1258        self.run_turn(cx, async move |this, cx| {
1259            this.update(cx, |this, cx| {
1260                this.connection
1261                    .resume(&this.session_id, cx)
1262                    .map(|resume| resume.run(cx))
1263            })?
1264            .context("resuming a session is not supported")?
1265            .await
1266        })
1267    }
1268
1269    fn run_turn(
1270        &mut self,
1271        cx: &mut Context<Self>,
1272        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1273    ) -> BoxFuture<'static, Result<()>> {
1274        self.clear_completed_plan_entries(cx);
1275
1276        let (tx, rx) = oneshot::channel();
1277        let cancel_task = self.cancel(cx);
1278
1279        self.send_task = Some(cx.spawn(async move |this, cx| {
1280            cancel_task.await;
1281            tx.send(f(this, cx).await).ok();
1282        }));
1283
1284        cx.spawn(async move |this, cx| {
1285            let response = rx.await;
1286
1287            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1288                .await?;
1289
1290            this.update(cx, |this, cx| {
1291                match response {
1292                    Ok(Err(e)) => {
1293                        this.send_task.take();
1294                        cx.emit(AcpThreadEvent::Error);
1295                        Err(e)
1296                    }
1297                    result => {
1298                        let canceled = matches!(
1299                            result,
1300                            Ok(Ok(acp::PromptResponse {
1301                                stop_reason: acp::StopReason::Canceled
1302                            }))
1303                        );
1304
1305                        // We only take the task if the current prompt wasn't canceled.
1306                        //
1307                        // This prompt may have been canceled because another one was sent
1308                        // while it was still generating. In these cases, dropping `send_task`
1309                        // would cause the next generation to be canceled.
1310                        if !canceled {
1311                            this.send_task.take();
1312                        }
1313
1314                        cx.emit(AcpThreadEvent::Stopped);
1315                        Ok(())
1316                    }
1317                }
1318            })?
1319        })
1320        .boxed()
1321    }
1322
1323    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1324        let Some(send_task) = self.send_task.take() else {
1325            return Task::ready(());
1326        };
1327
1328        for entry in self.entries.iter_mut() {
1329            if let AgentThreadEntry::ToolCall(call) = entry {
1330                let cancel = matches!(
1331                    call.status,
1332                    ToolCallStatus::Pending
1333                        | ToolCallStatus::WaitingForConfirmation { .. }
1334                        | ToolCallStatus::InProgress
1335                );
1336
1337                if cancel {
1338                    call.status = ToolCallStatus::Canceled;
1339                }
1340            }
1341        }
1342
1343        self.connection.cancel(&self.session_id, cx);
1344
1345        // Wait for the send task to complete
1346        cx.foreground_executor().spawn(send_task)
1347    }
1348
1349    /// Rewinds this thread to before the entry at `index`, removing it and all
1350    /// subsequent entries while reverting any changes made from that point.
1351    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1352        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1353            return Task::ready(Err(anyhow!("not supported")));
1354        };
1355        let Some(message) = self.user_message(&id) else {
1356            return Task::ready(Err(anyhow!("message not found")));
1357        };
1358
1359        let checkpoint = message
1360            .checkpoint
1361            .as_ref()
1362            .map(|c| c.git_checkpoint.clone());
1363
1364        let git_store = self.project.read(cx).git_store().clone();
1365        cx.spawn(async move |this, cx| {
1366            if let Some(checkpoint) = checkpoint {
1367                git_store
1368                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1369                    .await?;
1370            }
1371
1372            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1373                .await?;
1374            this.update(cx, |this, cx| {
1375                if let Some((ix, _)) = this.user_message_mut(&id) {
1376                    let range = ix..this.entries.len();
1377                    this.entries.truncate(ix);
1378                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1379                }
1380            })
1381        })
1382    }
1383
1384    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1385        let git_store = self.project.read(cx).git_store().clone();
1386
1387        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1388            if let Some(checkpoint) = message.checkpoint.as_ref() {
1389                checkpoint.git_checkpoint.clone()
1390            } else {
1391                return Task::ready(Ok(()));
1392            }
1393        } else {
1394            return Task::ready(Ok(()));
1395        };
1396
1397        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1398        cx.spawn(async move |this, cx| {
1399            let new_checkpoint = new_checkpoint
1400                .await
1401                .context("failed to get new checkpoint")
1402                .log_err();
1403            if let Some(new_checkpoint) = new_checkpoint {
1404                let equal = git_store
1405                    .update(cx, |git, cx| {
1406                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1407                    })?
1408                    .await
1409                    .unwrap_or(true);
1410                this.update(cx, |this, cx| {
1411                    let (ix, message) = this.last_user_message().context("no user message")?;
1412                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1413                    checkpoint.show = !equal;
1414                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1415                    anyhow::Ok(())
1416                })??;
1417            }
1418
1419            Ok(())
1420        })
1421    }
1422
1423    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1424        self.entries
1425            .iter_mut()
1426            .enumerate()
1427            .rev()
1428            .find_map(|(ix, entry)| {
1429                if let AgentThreadEntry::UserMessage(message) = entry {
1430                    Some((ix, message))
1431                } else {
1432                    None
1433                }
1434            })
1435    }
1436
1437    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1438        self.entries.iter().find_map(|entry| {
1439            if let AgentThreadEntry::UserMessage(message) = entry {
1440                if message.id.as_ref() == Some(&id) {
1441                    Some(message)
1442                } else {
1443                    None
1444                }
1445            } else {
1446                None
1447            }
1448        })
1449    }
1450
1451    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1452        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1453            if let AgentThreadEntry::UserMessage(message) = entry {
1454                if message.id.as_ref() == Some(&id) {
1455                    Some((ix, message))
1456                } else {
1457                    None
1458                }
1459            } else {
1460                None
1461            }
1462        })
1463    }
1464
1465    pub fn read_text_file(
1466        &self,
1467        path: PathBuf,
1468        line: Option<u32>,
1469        limit: Option<u32>,
1470        reuse_shared_snapshot: bool,
1471        cx: &mut Context<Self>,
1472    ) -> Task<Result<String>> {
1473        let project = self.project.clone();
1474        let action_log = self.action_log.clone();
1475        cx.spawn(async move |this, cx| {
1476            let load = project.update(cx, |project, cx| {
1477                let path = project
1478                    .project_path_for_absolute_path(&path, cx)
1479                    .context("invalid path")?;
1480                anyhow::Ok(project.open_buffer(path, cx))
1481            });
1482            let buffer = load??.await?;
1483
1484            let snapshot = if reuse_shared_snapshot {
1485                this.read_with(cx, |this, _| {
1486                    this.shared_buffers.get(&buffer.clone()).cloned()
1487                })
1488                .log_err()
1489                .flatten()
1490            } else {
1491                None
1492            };
1493
1494            let snapshot = if let Some(snapshot) = snapshot {
1495                snapshot
1496            } else {
1497                action_log.update(cx, |action_log, cx| {
1498                    action_log.buffer_read(buffer.clone(), cx);
1499                })?;
1500                project.update(cx, |project, cx| {
1501                    let position = buffer
1502                        .read(cx)
1503                        .snapshot()
1504                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1505                    project.set_agent_location(
1506                        Some(AgentLocation {
1507                            buffer: buffer.downgrade(),
1508                            position,
1509                        }),
1510                        cx,
1511                    );
1512                })?;
1513
1514                buffer.update(cx, |buffer, _| buffer.snapshot())?
1515            };
1516
1517            this.update(cx, |this, _| {
1518                let text = snapshot.text();
1519                this.shared_buffers.insert(buffer.clone(), snapshot);
1520                if line.is_none() && limit.is_none() {
1521                    return Ok(text);
1522                }
1523                let limit = limit.unwrap_or(u32::MAX) as usize;
1524                let Some(line) = line else {
1525                    return Ok(text.lines().take(limit).collect::<String>());
1526                };
1527
1528                let count = text.lines().count();
1529                if count < line as usize {
1530                    anyhow::bail!("There are only {} lines", count);
1531                }
1532                Ok(text
1533                    .lines()
1534                    .skip(line as usize + 1)
1535                    .take(limit)
1536                    .collect::<String>())
1537            })?
1538        })
1539    }
1540
1541    pub fn write_text_file(
1542        &self,
1543        path: PathBuf,
1544        content: String,
1545        cx: &mut Context<Self>,
1546    ) -> Task<Result<()>> {
1547        let project = self.project.clone();
1548        let action_log = self.action_log.clone();
1549        cx.spawn(async move |this, cx| {
1550            let load = project.update(cx, |project, cx| {
1551                let path = project
1552                    .project_path_for_absolute_path(&path, cx)
1553                    .context("invalid path")?;
1554                anyhow::Ok(project.open_buffer(path, cx))
1555            });
1556            let buffer = load??.await?;
1557            let snapshot = this.update(cx, |this, cx| {
1558                this.shared_buffers
1559                    .get(&buffer)
1560                    .cloned()
1561                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1562            })?;
1563            let edits = cx
1564                .background_executor()
1565                .spawn(async move {
1566                    let old_text = snapshot.text();
1567                    text_diff(old_text.as_str(), &content)
1568                        .into_iter()
1569                        .map(|(range, replacement)| {
1570                            (
1571                                snapshot.anchor_after(range.start)
1572                                    ..snapshot.anchor_before(range.end),
1573                                replacement,
1574                            )
1575                        })
1576                        .collect::<Vec<_>>()
1577                })
1578                .await;
1579            cx.update(|cx| {
1580                project.update(cx, |project, cx| {
1581                    project.set_agent_location(
1582                        Some(AgentLocation {
1583                            buffer: buffer.downgrade(),
1584                            position: edits
1585                                .last()
1586                                .map(|(range, _)| range.end)
1587                                .unwrap_or(Anchor::MIN),
1588                        }),
1589                        cx,
1590                    );
1591                });
1592
1593                action_log.update(cx, |action_log, cx| {
1594                    action_log.buffer_read(buffer.clone(), cx);
1595                });
1596                buffer.update(cx, |buffer, cx| {
1597                    buffer.edit(edits, None, cx);
1598                });
1599                action_log.update(cx, |action_log, cx| {
1600                    action_log.buffer_edited(buffer.clone(), cx);
1601                });
1602            })?;
1603            project
1604                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1605                .await
1606        })
1607    }
1608
1609    pub fn to_markdown(&self, cx: &App) -> String {
1610        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1611    }
1612
1613    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1614        cx.emit(AcpThreadEvent::ServerExited(status));
1615    }
1616}
1617
1618fn markdown_for_raw_output(
1619    raw_output: &serde_json::Value,
1620    language_registry: &Arc<LanguageRegistry>,
1621    cx: &mut App,
1622) -> Option<Entity<Markdown>> {
1623    match raw_output {
1624        serde_json::Value::Null => None,
1625        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1626            Markdown::new(
1627                value.to_string().into(),
1628                Some(language_registry.clone()),
1629                None,
1630                cx,
1631            )
1632        })),
1633        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1634            Markdown::new(
1635                value.to_string().into(),
1636                Some(language_registry.clone()),
1637                None,
1638                cx,
1639            )
1640        })),
1641        serde_json::Value::String(value) => Some(cx.new(|cx| {
1642            Markdown::new(
1643                value.clone().into(),
1644                Some(language_registry.clone()),
1645                None,
1646                cx,
1647            )
1648        })),
1649        value => Some(cx.new(|cx| {
1650            Markdown::new(
1651                format!("```json\n{}\n```", value).into(),
1652                Some(language_registry.clone()),
1653                None,
1654                cx,
1655            )
1656        })),
1657    }
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662    use super::*;
1663    use anyhow::anyhow;
1664    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1665    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1666    use indoc::indoc;
1667    use project::{FakeFs, Fs};
1668    use rand::Rng as _;
1669    use serde_json::json;
1670    use settings::SettingsStore;
1671    use smol::stream::StreamExt as _;
1672    use std::{
1673        any::Any,
1674        cell::RefCell,
1675        path::Path,
1676        rc::Rc,
1677        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1678        time::Duration,
1679    };
1680    use util::path;
1681
1682    fn init_test(cx: &mut TestAppContext) {
1683        env_logger::try_init().ok();
1684        cx.update(|cx| {
1685            let settings_store = SettingsStore::test(cx);
1686            cx.set_global(settings_store);
1687            Project::init_settings(cx);
1688            language::init(cx);
1689        });
1690    }
1691
1692    #[gpui::test]
1693    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1694        init_test(cx);
1695
1696        let fs = FakeFs::new(cx.executor());
1697        let project = Project::test(fs, [], cx).await;
1698        let connection = Rc::new(FakeAgentConnection::new());
1699        let thread = cx
1700            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1701            .await
1702            .unwrap();
1703
1704        // Test creating a new user message
1705        thread.update(cx, |thread, cx| {
1706            thread.push_user_content_block(
1707                None,
1708                acp::ContentBlock::Text(acp::TextContent {
1709                    annotations: None,
1710                    text: "Hello, ".to_string(),
1711                }),
1712                cx,
1713            );
1714        });
1715
1716        thread.update(cx, |thread, cx| {
1717            assert_eq!(thread.entries.len(), 1);
1718            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1719                assert_eq!(user_msg.id, None);
1720                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1721            } else {
1722                panic!("Expected UserMessage");
1723            }
1724        });
1725
1726        // Test appending to existing user message
1727        let message_1_id = UserMessageId::new();
1728        thread.update(cx, |thread, cx| {
1729            thread.push_user_content_block(
1730                Some(message_1_id.clone()),
1731                acp::ContentBlock::Text(acp::TextContent {
1732                    annotations: None,
1733                    text: "world!".to_string(),
1734                }),
1735                cx,
1736            );
1737        });
1738
1739        thread.update(cx, |thread, cx| {
1740            assert_eq!(thread.entries.len(), 1);
1741            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1742                assert_eq!(user_msg.id, Some(message_1_id));
1743                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1744            } else {
1745                panic!("Expected UserMessage");
1746            }
1747        });
1748
1749        // Test creating new user message after assistant message
1750        thread.update(cx, |thread, cx| {
1751            thread.push_assistant_content_block(
1752                acp::ContentBlock::Text(acp::TextContent {
1753                    annotations: None,
1754                    text: "Assistant response".to_string(),
1755                }),
1756                false,
1757                cx,
1758            );
1759        });
1760
1761        let message_2_id = UserMessageId::new();
1762        thread.update(cx, |thread, cx| {
1763            thread.push_user_content_block(
1764                Some(message_2_id.clone()),
1765                acp::ContentBlock::Text(acp::TextContent {
1766                    annotations: None,
1767                    text: "New user message".to_string(),
1768                }),
1769                cx,
1770            );
1771        });
1772
1773        thread.update(cx, |thread, cx| {
1774            assert_eq!(thread.entries.len(), 3);
1775            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1776                assert_eq!(user_msg.id, Some(message_2_id));
1777                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1778            } else {
1779                panic!("Expected UserMessage at index 2");
1780            }
1781        });
1782    }
1783
1784    #[gpui::test]
1785    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1786        init_test(cx);
1787
1788        let fs = FakeFs::new(cx.executor());
1789        let project = Project::test(fs, [], cx).await;
1790        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1791            |_, thread, mut cx| {
1792                async move {
1793                    thread.update(&mut cx, |thread, cx| {
1794                        thread
1795                            .handle_session_update(
1796                                acp::SessionUpdate::AgentThoughtChunk {
1797                                    content: "Thinking ".into(),
1798                                },
1799                                cx,
1800                            )
1801                            .unwrap();
1802                        thread
1803                            .handle_session_update(
1804                                acp::SessionUpdate::AgentThoughtChunk {
1805                                    content: "hard!".into(),
1806                                },
1807                                cx,
1808                            )
1809                            .unwrap();
1810                    })?;
1811                    Ok(acp::PromptResponse {
1812                        stop_reason: acp::StopReason::EndTurn,
1813                    })
1814                }
1815                .boxed_local()
1816            },
1817        ));
1818
1819        let thread = cx
1820            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1821            .await
1822            .unwrap();
1823
1824        thread
1825            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1826            .await
1827            .unwrap();
1828
1829        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1830        assert_eq!(
1831            output,
1832            indoc! {r#"
1833            ## User
1834
1835            Hello from Zed!
1836
1837            ## Assistant
1838
1839            <thinking>
1840            Thinking hard!
1841            </thinking>
1842
1843            "#}
1844        );
1845    }
1846
1847    #[gpui::test]
1848    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1849        init_test(cx);
1850
1851        let fs = FakeFs::new(cx.executor());
1852        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1853            .await;
1854        let project = Project::test(fs.clone(), [], cx).await;
1855        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1856        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1857        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1858            move |_, thread, mut cx| {
1859                let read_file_tx = read_file_tx.clone();
1860                async move {
1861                    let content = thread
1862                        .update(&mut cx, |thread, cx| {
1863                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1864                        })
1865                        .unwrap()
1866                        .await
1867                        .unwrap();
1868                    assert_eq!(content, "one\ntwo\nthree\n");
1869                    read_file_tx.take().unwrap().send(()).unwrap();
1870                    thread
1871                        .update(&mut cx, |thread, cx| {
1872                            thread.write_text_file(
1873                                path!("/tmp/foo").into(),
1874                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1875                                cx,
1876                            )
1877                        })
1878                        .unwrap()
1879                        .await
1880                        .unwrap();
1881                    Ok(acp::PromptResponse {
1882                        stop_reason: acp::StopReason::EndTurn,
1883                    })
1884                }
1885                .boxed_local()
1886            },
1887        ));
1888
1889        let (worktree, pathbuf) = project
1890            .update(cx, |project, cx| {
1891                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1892            })
1893            .await
1894            .unwrap();
1895        let buffer = project
1896            .update(cx, |project, cx| {
1897                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1898            })
1899            .await
1900            .unwrap();
1901
1902        let thread = cx
1903            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1904            .await
1905            .unwrap();
1906
1907        let request = thread.update(cx, |thread, cx| {
1908            thread.send_raw("Extend the count in /tmp/foo", cx)
1909        });
1910        read_file_rx.await.ok();
1911        buffer.update(cx, |buffer, cx| {
1912            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1913        });
1914        cx.run_until_parked();
1915        assert_eq!(
1916            buffer.read_with(cx, |buffer, _| buffer.text()),
1917            "zero\none\ntwo\nthree\nfour\nfive\n"
1918        );
1919        assert_eq!(
1920            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1921            "zero\none\ntwo\nthree\nfour\nfive\n"
1922        );
1923        request.await.unwrap();
1924    }
1925
1926    #[gpui::test]
1927    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1928        init_test(cx);
1929
1930        let fs = FakeFs::new(cx.executor());
1931        let project = Project::test(fs, [], cx).await;
1932        let id = acp::ToolCallId("test".into());
1933
1934        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1935            let id = id.clone();
1936            move |_, thread, mut cx| {
1937                let id = id.clone();
1938                async move {
1939                    thread
1940                        .update(&mut cx, |thread, cx| {
1941                            thread.handle_session_update(
1942                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1943                                    id: id.clone(),
1944                                    title: "Label".into(),
1945                                    kind: acp::ToolKind::Fetch,
1946                                    status: acp::ToolCallStatus::InProgress,
1947                                    content: vec![],
1948                                    locations: vec![],
1949                                    raw_input: None,
1950                                    raw_output: None,
1951                                }),
1952                                cx,
1953                            )
1954                        })
1955                        .unwrap()
1956                        .unwrap();
1957                    Ok(acp::PromptResponse {
1958                        stop_reason: acp::StopReason::EndTurn,
1959                    })
1960                }
1961                .boxed_local()
1962            }
1963        }));
1964
1965        let thread = cx
1966            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1967            .await
1968            .unwrap();
1969
1970        let request = thread.update(cx, |thread, cx| {
1971            thread.send_raw("Fetch https://example.com", cx)
1972        });
1973
1974        run_until_first_tool_call(&thread, cx).await;
1975
1976        thread.read_with(cx, |thread, _| {
1977            assert!(matches!(
1978                thread.entries[1],
1979                AgentThreadEntry::ToolCall(ToolCall {
1980                    status: ToolCallStatus::InProgress,
1981                    ..
1982                })
1983            ));
1984        });
1985
1986        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1987
1988        thread.read_with(cx, |thread, _| {
1989            assert!(matches!(
1990                &thread.entries[1],
1991                AgentThreadEntry::ToolCall(ToolCall {
1992                    status: ToolCallStatus::Canceled,
1993                    ..
1994                })
1995            ));
1996        });
1997
1998        thread
1999            .update(cx, |thread, cx| {
2000                thread.handle_session_update(
2001                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2002                        id,
2003                        fields: acp::ToolCallUpdateFields {
2004                            status: Some(acp::ToolCallStatus::Completed),
2005                            ..Default::default()
2006                        },
2007                    }),
2008                    cx,
2009                )
2010            })
2011            .unwrap();
2012
2013        request.await.unwrap();
2014
2015        thread.read_with(cx, |thread, _| {
2016            assert!(matches!(
2017                thread.entries[1],
2018                AgentThreadEntry::ToolCall(ToolCall {
2019                    status: ToolCallStatus::Completed,
2020                    ..
2021                })
2022            ));
2023        });
2024    }
2025
2026    #[gpui::test]
2027    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2028        init_test(cx);
2029        let fs = FakeFs::new(cx.background_executor.clone());
2030        fs.insert_tree(path!("/test"), json!({})).await;
2031        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2032
2033        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2034            move |_, thread, mut cx| {
2035                async move {
2036                    thread
2037                        .update(&mut cx, |thread, cx| {
2038                            thread.handle_session_update(
2039                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2040                                    id: acp::ToolCallId("test".into()),
2041                                    title: "Label".into(),
2042                                    kind: acp::ToolKind::Edit,
2043                                    status: acp::ToolCallStatus::Completed,
2044                                    content: vec![acp::ToolCallContent::Diff {
2045                                        diff: acp::Diff {
2046                                            path: "/test/test.txt".into(),
2047                                            old_text: None,
2048                                            new_text: "foo".into(),
2049                                        },
2050                                    }],
2051                                    locations: vec![],
2052                                    raw_input: None,
2053                                    raw_output: None,
2054                                }),
2055                                cx,
2056                            )
2057                        })
2058                        .unwrap()
2059                        .unwrap();
2060                    Ok(acp::PromptResponse {
2061                        stop_reason: acp::StopReason::EndTurn,
2062                    })
2063                }
2064                .boxed_local()
2065            }
2066        }));
2067
2068        let thread = cx
2069            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2070            .await
2071            .unwrap();
2072
2073        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2074            .await
2075            .unwrap();
2076
2077        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2078    }
2079
2080    #[gpui::test(iterations = 10)]
2081    async fn test_checkpoints(cx: &mut TestAppContext) {
2082        init_test(cx);
2083        let fs = FakeFs::new(cx.background_executor.clone());
2084        fs.insert_tree(
2085            path!("/test"),
2086            json!({
2087                ".git": {}
2088            }),
2089        )
2090        .await;
2091        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2092
2093        let simulate_changes = Arc::new(AtomicBool::new(true));
2094        let next_filename = Arc::new(AtomicUsize::new(0));
2095        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2096            let simulate_changes = simulate_changes.clone();
2097            let next_filename = next_filename.clone();
2098            let fs = fs.clone();
2099            move |request, thread, mut cx| {
2100                let fs = fs.clone();
2101                let simulate_changes = simulate_changes.clone();
2102                let next_filename = next_filename.clone();
2103                async move {
2104                    if simulate_changes.load(SeqCst) {
2105                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2106                        fs.write(Path::new(&filename), b"").await?;
2107                    }
2108
2109                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2110                        panic!("expected text content block");
2111                    };
2112                    thread.update(&mut cx, |thread, cx| {
2113                        thread
2114                            .handle_session_update(
2115                                acp::SessionUpdate::AgentMessageChunk {
2116                                    content: content.text.to_uppercase().into(),
2117                                },
2118                                cx,
2119                            )
2120                            .unwrap();
2121                    })?;
2122                    Ok(acp::PromptResponse {
2123                        stop_reason: acp::StopReason::EndTurn,
2124                    })
2125                }
2126                .boxed_local()
2127            }
2128        }));
2129        let thread = cx
2130            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2131            .await
2132            .unwrap();
2133
2134        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2135            .await
2136            .unwrap();
2137        thread.read_with(cx, |thread, cx| {
2138            assert_eq!(
2139                thread.to_markdown(cx),
2140                indoc! {"
2141                    ## User (checkpoint)
2142
2143                    Lorem
2144
2145                    ## Assistant
2146
2147                    LOREM
2148
2149                "}
2150            );
2151        });
2152        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2153
2154        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2155            .await
2156            .unwrap();
2157        thread.read_with(cx, |thread, cx| {
2158            assert_eq!(
2159                thread.to_markdown(cx),
2160                indoc! {"
2161                    ## User (checkpoint)
2162
2163                    Lorem
2164
2165                    ## Assistant
2166
2167                    LOREM
2168
2169                    ## User (checkpoint)
2170
2171                    ipsum
2172
2173                    ## Assistant
2174
2175                    IPSUM
2176
2177                "}
2178            );
2179        });
2180        assert_eq!(
2181            fs.files(),
2182            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2183        );
2184
2185        // Checkpoint isn't stored when there are no changes.
2186        simulate_changes.store(false, SeqCst);
2187        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2188            .await
2189            .unwrap();
2190        thread.read_with(cx, |thread, cx| {
2191            assert_eq!(
2192                thread.to_markdown(cx),
2193                indoc! {"
2194                    ## User (checkpoint)
2195
2196                    Lorem
2197
2198                    ## Assistant
2199
2200                    LOREM
2201
2202                    ## User (checkpoint)
2203
2204                    ipsum
2205
2206                    ## Assistant
2207
2208                    IPSUM
2209
2210                    ## User
2211
2212                    dolor
2213
2214                    ## Assistant
2215
2216                    DOLOR
2217
2218                "}
2219            );
2220        });
2221        assert_eq!(
2222            fs.files(),
2223            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2224        );
2225
2226        // Rewinding the conversation truncates the history and restores the checkpoint.
2227        thread
2228            .update(cx, |thread, cx| {
2229                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2230                    panic!("unexpected entries {:?}", thread.entries)
2231                };
2232                thread.rewind(message.id.clone().unwrap(), cx)
2233            })
2234            .await
2235            .unwrap();
2236        thread.read_with(cx, |thread, cx| {
2237            assert_eq!(
2238                thread.to_markdown(cx),
2239                indoc! {"
2240                    ## User (checkpoint)
2241
2242                    Lorem
2243
2244                    ## Assistant
2245
2246                    LOREM
2247
2248                "}
2249            );
2250        });
2251        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2252    }
2253
2254    async fn run_until_first_tool_call(
2255        thread: &Entity<AcpThread>,
2256        cx: &mut TestAppContext,
2257    ) -> usize {
2258        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2259
2260        let subscription = cx.update(|cx| {
2261            cx.subscribe(thread, move |thread, _, cx| {
2262                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2263                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2264                        return tx.try_send(ix).unwrap();
2265                    }
2266                }
2267            })
2268        });
2269
2270        select! {
2271            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2272                panic!("Timeout waiting for tool call")
2273            }
2274            ix = rx.next().fuse() => {
2275                drop(subscription);
2276                ix.unwrap()
2277            }
2278        }
2279    }
2280
2281    #[derive(Clone, Default)]
2282    struct FakeAgentConnection {
2283        auth_methods: Vec<acp::AuthMethod>,
2284        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2285        on_user_message: Option<
2286            Rc<
2287                dyn Fn(
2288                        acp::PromptRequest,
2289                        WeakEntity<AcpThread>,
2290                        AsyncApp,
2291                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2292                    + 'static,
2293            >,
2294        >,
2295    }
2296
2297    impl FakeAgentConnection {
2298        fn new() -> Self {
2299            Self {
2300                auth_methods: Vec::new(),
2301                on_user_message: None,
2302                sessions: Arc::default(),
2303            }
2304        }
2305
2306        #[expect(unused)]
2307        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2308            self.auth_methods = auth_methods;
2309            self
2310        }
2311
2312        fn on_user_message(
2313            mut self,
2314            handler: impl Fn(
2315                acp::PromptRequest,
2316                WeakEntity<AcpThread>,
2317                AsyncApp,
2318            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2319            + 'static,
2320        ) -> Self {
2321            self.on_user_message.replace(Rc::new(handler));
2322            self
2323        }
2324    }
2325
2326    impl AgentConnection for FakeAgentConnection {
2327        fn auth_methods(&self) -> &[acp::AuthMethod] {
2328            &self.auth_methods
2329        }
2330
2331        fn new_thread(
2332            self: Rc<Self>,
2333            project: Entity<Project>,
2334            _cwd: &Path,
2335            cx: &mut App,
2336        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2337            let session_id = acp::SessionId(
2338                rand::thread_rng()
2339                    .sample_iter(&rand::distributions::Alphanumeric)
2340                    .take(7)
2341                    .map(char::from)
2342                    .collect::<String>()
2343                    .into(),
2344            );
2345            let thread =
2346                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2347            self.sessions.lock().insert(session_id, thread.downgrade());
2348            Task::ready(Ok(thread))
2349        }
2350
2351        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2352            if self.auth_methods().iter().any(|m| m.id == method) {
2353                Task::ready(Ok(()))
2354            } else {
2355                Task::ready(Err(anyhow!("Invalid Auth Method")))
2356            }
2357        }
2358
2359        fn prompt(
2360            &self,
2361            _id: Option<UserMessageId>,
2362            params: acp::PromptRequest,
2363            cx: &mut App,
2364        ) -> Task<gpui::Result<acp::PromptResponse>> {
2365            let sessions = self.sessions.lock();
2366            let thread = sessions.get(&params.session_id).unwrap();
2367            if let Some(handler) = &self.on_user_message {
2368                let handler = handler.clone();
2369                let thread = thread.clone();
2370                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2371            } else {
2372                Task::ready(Ok(acp::PromptResponse {
2373                    stop_reason: acp::StopReason::EndTurn,
2374                }))
2375            }
2376        }
2377
2378        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2379            let sessions = self.sessions.lock();
2380            let thread = sessions.get(&session_id).unwrap().clone();
2381
2382            cx.spawn(async move |cx| {
2383                thread
2384                    .update(cx, |thread, cx| thread.cancel(cx))
2385                    .unwrap()
2386                    .await
2387            })
2388            .detach();
2389        }
2390
2391        fn session_editor(
2392            &self,
2393            session_id: &acp::SessionId,
2394            _cx: &mut App,
2395        ) -> Option<Rc<dyn AgentSessionEditor>> {
2396            Some(Rc::new(FakeAgentSessionEditor {
2397                _session_id: session_id.clone(),
2398            }))
2399        }
2400
2401        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2402            self
2403        }
2404    }
2405
2406    struct FakeAgentSessionEditor {
2407        _session_id: acp::SessionId,
2408    }
2409
2410    impl AgentSessionEditor for FakeAgentSessionEditor {
2411        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2412            Task::ready(Ok(()))
2413        }
2414    }
2415}