acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9use serde::{Deserialize, Serialize};
  10pub use terminal::*;
  11
  12use action_log::ActionLog;
  13use agent_client_protocol as acp;
  14use anyhow::{Context as _, Result, anyhow};
  15use editor::Bias;
  16use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  17use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  18use itertools::Itertools;
  19use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  20use markdown::Markdown;
  21use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use ui::App;
  31use util::ResultExt;
  32
  33#[derive(Debug)]
  34pub struct UserMessage {
  35    pub id: Option<UserMessageId>,
  36    pub content: ContentBlock,
  37    pub chunks: Vec<acp::ContentBlock>,
  38    pub checkpoint: Option<Checkpoint>,
  39}
  40
  41#[derive(Debug)]
  42pub struct Checkpoint {
  43    git_checkpoint: GitStoreCheckpoint,
  44    pub show: bool,
  45}
  46
  47impl UserMessage {
  48    fn to_markdown(&self, cx: &App) -> String {
  49        let mut markdown = String::new();
  50        if self
  51            .checkpoint
  52            .as_ref()
  53            .is_some_and(|checkpoint| checkpoint.show)
  54        {
  55            writeln!(markdown, "## User (checkpoint)").unwrap();
  56        } else {
  57            writeln!(markdown, "## User").unwrap();
  58        }
  59        writeln!(markdown).unwrap();
  60        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  61        writeln!(markdown).unwrap();
  62        markdown
  63    }
  64}
  65
  66#[derive(Debug, PartialEq)]
  67pub struct AssistantMessage {
  68    pub chunks: Vec<AssistantMessageChunk>,
  69}
  70
  71impl AssistantMessage {
  72    pub fn to_markdown(&self, cx: &App) -> String {
  73        format!(
  74            "## Assistant\n\n{}\n\n",
  75            self.chunks
  76                .iter()
  77                .map(|chunk| chunk.to_markdown(cx))
  78                .join("\n\n")
  79        )
  80    }
  81}
  82
  83#[derive(Debug, PartialEq)]
  84pub enum AssistantMessageChunk {
  85    Message { block: ContentBlock },
  86    Thought { block: ContentBlock },
  87}
  88
  89impl AssistantMessageChunk {
  90    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  91        Self::Message {
  92            block: ContentBlock::new(chunk.into(), language_registry, cx),
  93        }
  94    }
  95
  96    fn to_markdown(&self, cx: &App) -> String {
  97        match self {
  98            Self::Message { block } => block.to_markdown(cx).to_string(),
  99            Self::Thought { block } => {
 100                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 101            }
 102        }
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub enum AgentThreadEntry {
 108    UserMessage(UserMessage),
 109    AssistantMessage(AssistantMessage),
 110    ToolCall(ToolCall),
 111}
 112
 113impl AgentThreadEntry {
 114    pub fn to_markdown(&self, cx: &App) -> String {
 115        match self {
 116            Self::UserMessage(message) => message.to_markdown(cx),
 117            Self::AssistantMessage(message) => message.to_markdown(cx),
 118            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 119        }
 120    }
 121
 122    pub fn user_message(&self) -> Option<&UserMessage> {
 123        if let AgentThreadEntry::UserMessage(message) = self {
 124            Some(message)
 125        } else {
 126            None
 127        }
 128    }
 129
 130    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 131        if let AgentThreadEntry::ToolCall(call) = self {
 132            itertools::Either::Left(call.diffs())
 133        } else {
 134            itertools::Either::Right(std::iter::empty())
 135        }
 136    }
 137
 138    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 139        if let AgentThreadEntry::ToolCall(call) = self {
 140            itertools::Either::Left(call.terminals())
 141        } else {
 142            itertools::Either::Right(std::iter::empty())
 143        }
 144    }
 145
 146    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 147        if let AgentThreadEntry::ToolCall(ToolCall {
 148            locations,
 149            resolved_locations,
 150            ..
 151        }) = self
 152        {
 153            Some((
 154                locations.get(ix)?.clone(),
 155                resolved_locations.get(ix)?.clone()?,
 156            ))
 157        } else {
 158            None
 159        }
 160    }
 161}
 162
 163#[derive(Debug)]
 164pub struct ToolCall {
 165    pub id: acp::ToolCallId,
 166    pub label: Entity<Markdown>,
 167    pub kind: acp::ToolKind,
 168    pub content: Vec<ToolCallContent>,
 169    pub status: ToolCallStatus,
 170    pub locations: Vec<acp::ToolCallLocation>,
 171    pub resolved_locations: Vec<Option<AgentLocation>>,
 172    pub raw_input: Option<serde_json::Value>,
 173    pub raw_output: Option<serde_json::Value>,
 174}
 175
 176impl ToolCall {
 177    fn from_acp(
 178        tool_call: acp::ToolCall,
 179        status: ToolCallStatus,
 180        language_registry: Arc<LanguageRegistry>,
 181        cx: &mut App,
 182    ) -> Self {
 183        Self {
 184            id: tool_call.id,
 185            label: cx.new(|cx| {
 186                Markdown::new(
 187                    tool_call.title.into(),
 188                    Some(language_registry.clone()),
 189                    None,
 190                    cx,
 191                )
 192            }),
 193            kind: tool_call.kind,
 194            content: tool_call
 195                .content
 196                .into_iter()
 197                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 198                .collect(),
 199            locations: tool_call.locations,
 200            resolved_locations: Vec::default(),
 201            status,
 202            raw_input: tool_call.raw_input,
 203            raw_output: tool_call.raw_output,
 204        }
 205    }
 206
 207    fn update_fields(
 208        &mut self,
 209        fields: acp::ToolCallUpdateFields,
 210        language_registry: Arc<LanguageRegistry>,
 211        cx: &mut App,
 212    ) {
 213        let acp::ToolCallUpdateFields {
 214            kind,
 215            status,
 216            title,
 217            content,
 218            locations,
 219            raw_input,
 220            raw_output,
 221        } = fields;
 222
 223        if let Some(kind) = kind {
 224            self.kind = kind;
 225        }
 226
 227        if let Some(status) = status {
 228            self.status = status.into();
 229        }
 230
 231        if let Some(title) = title {
 232            self.label.update(cx, |label, cx| {
 233                label.replace(title, cx);
 234            });
 235        }
 236
 237        if let Some(content) = content {
 238            self.content = content
 239                .into_iter()
 240                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 241                .collect();
 242        }
 243
 244        if let Some(locations) = locations {
 245            self.locations = locations;
 246        }
 247
 248        if let Some(raw_input) = raw_input {
 249            self.raw_input = Some(raw_input);
 250        }
 251
 252        if let Some(raw_output) = raw_output {
 253            if self.content.is_empty()
 254                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 255            {
 256                self.content
 257                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 258                        markdown,
 259                    }));
 260            }
 261            self.raw_output = Some(raw_output);
 262        }
 263    }
 264
 265    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 266        self.content.iter().filter_map(|content| match content {
 267            ToolCallContent::Diff(diff) => Some(diff),
 268            ToolCallContent::ContentBlock(_) => None,
 269            ToolCallContent::Terminal(_) => None,
 270        })
 271    }
 272
 273    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 274        self.content.iter().filter_map(|content| match content {
 275            ToolCallContent::Terminal(terminal) => Some(terminal),
 276            ToolCallContent::ContentBlock(_) => None,
 277            ToolCallContent::Diff(_) => None,
 278        })
 279    }
 280
 281    fn to_markdown(&self, cx: &App) -> String {
 282        let mut markdown = format!(
 283            "**Tool Call: {}**\nStatus: {}\n\n",
 284            self.label.read(cx).source(),
 285            self.status
 286        );
 287        for content in &self.content {
 288            markdown.push_str(content.to_markdown(cx).as_str());
 289            markdown.push_str("\n\n");
 290        }
 291        markdown
 292    }
 293
 294    async fn resolve_location(
 295        location: acp::ToolCallLocation,
 296        project: WeakEntity<Project>,
 297        cx: &mut AsyncApp,
 298    ) -> Option<AgentLocation> {
 299        let buffer = project
 300            .update(cx, |project, cx| {
 301                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 302                    Some(project.open_buffer(path, cx))
 303                } else {
 304                    None
 305                }
 306            })
 307            .ok()??;
 308        let buffer = buffer.await.log_err()?;
 309        let position = buffer
 310            .update(cx, |buffer, _| {
 311                if let Some(row) = location.line {
 312                    let snapshot = buffer.snapshot();
 313                    let column = snapshot.indent_size_for_line(row).len;
 314                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 315                    snapshot.anchor_before(point)
 316                } else {
 317                    Anchor::MIN
 318                }
 319            })
 320            .ok()?;
 321
 322        Some(AgentLocation {
 323            buffer: buffer.downgrade(),
 324            position,
 325        })
 326    }
 327
 328    fn resolve_locations(
 329        &self,
 330        project: Entity<Project>,
 331        cx: &mut App,
 332    ) -> Task<Vec<Option<AgentLocation>>> {
 333        let locations = self.locations.clone();
 334        project.update(cx, |_, cx| {
 335            cx.spawn(async move |project, cx| {
 336                let mut new_locations = Vec::new();
 337                for location in locations {
 338                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 339                }
 340                new_locations
 341            })
 342        })
 343    }
 344}
 345
 346#[derive(Debug)]
 347pub enum ToolCallStatus {
 348    /// The tool call hasn't started running yet, but we start showing it to
 349    /// the user.
 350    Pending,
 351    /// The tool call is waiting for confirmation from the user.
 352    WaitingForConfirmation {
 353        options: Vec<acp::PermissionOption>,
 354        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 355    },
 356    /// The tool call is currently running.
 357    InProgress,
 358    /// The tool call completed successfully.
 359    Completed,
 360    /// The tool call failed.
 361    Failed,
 362    /// The user rejected the tool call.
 363    Rejected,
 364    /// The user canceled generation so the tool call was canceled.
 365    Canceled,
 366}
 367
 368impl From<acp::ToolCallStatus> for ToolCallStatus {
 369    fn from(status: acp::ToolCallStatus) -> Self {
 370        match status {
 371            acp::ToolCallStatus::Pending => Self::Pending,
 372            acp::ToolCallStatus::InProgress => Self::InProgress,
 373            acp::ToolCallStatus::Completed => Self::Completed,
 374            acp::ToolCallStatus::Failed => Self::Failed,
 375        }
 376    }
 377}
 378
 379impl Display for ToolCallStatus {
 380    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 381        write!(
 382            f,
 383            "{}",
 384            match self {
 385                ToolCallStatus::Pending => "Pending",
 386                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 387                ToolCallStatus::InProgress => "In Progress",
 388                ToolCallStatus::Completed => "Completed",
 389                ToolCallStatus::Failed => "Failed",
 390                ToolCallStatus::Rejected => "Rejected",
 391                ToolCallStatus::Canceled => "Canceled",
 392            }
 393        )
 394    }
 395}
 396
 397#[derive(Debug, PartialEq, Clone)]
 398pub enum ContentBlock {
 399    Empty,
 400    Markdown { markdown: Entity<Markdown> },
 401    ResourceLink { resource_link: acp::ResourceLink },
 402}
 403
 404impl ContentBlock {
 405    pub fn new(
 406        block: acp::ContentBlock,
 407        language_registry: &Arc<LanguageRegistry>,
 408        cx: &mut App,
 409    ) -> Self {
 410        let mut this = Self::Empty;
 411        this.append(block, language_registry, cx);
 412        this
 413    }
 414
 415    pub fn new_combined(
 416        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 417        language_registry: Arc<LanguageRegistry>,
 418        cx: &mut App,
 419    ) -> Self {
 420        let mut this = Self::Empty;
 421        for block in blocks {
 422            this.append(block, &language_registry, cx);
 423        }
 424        this
 425    }
 426
 427    pub fn append(
 428        &mut self,
 429        block: acp::ContentBlock,
 430        language_registry: &Arc<LanguageRegistry>,
 431        cx: &mut App,
 432    ) {
 433        if matches!(self, ContentBlock::Empty)
 434            && let acp::ContentBlock::ResourceLink(resource_link) = block
 435        {
 436            *self = ContentBlock::ResourceLink { resource_link };
 437            return;
 438        }
 439
 440        let new_content = self.block_string_contents(block);
 441
 442        match self {
 443            ContentBlock::Empty => {
 444                *self = Self::create_markdown_block(new_content, language_registry, cx);
 445            }
 446            ContentBlock::Markdown { markdown } => {
 447                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 448            }
 449            ContentBlock::ResourceLink { resource_link } => {
 450                let existing_content = Self::resource_link_md(&resource_link.uri);
 451                let combined = format!("{}\n{}", existing_content, new_content);
 452
 453                *self = Self::create_markdown_block(combined, language_registry, cx);
 454            }
 455        }
 456    }
 457
 458    fn create_markdown_block(
 459        content: String,
 460        language_registry: &Arc<LanguageRegistry>,
 461        cx: &mut App,
 462    ) -> ContentBlock {
 463        ContentBlock::Markdown {
 464            markdown: cx
 465                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 466        }
 467    }
 468
 469    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 470        match block {
 471            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 472            acp::ContentBlock::ResourceLink(resource_link) => {
 473                Self::resource_link_md(&resource_link.uri)
 474            }
 475            acp::ContentBlock::Resource(acp::EmbeddedResource {
 476                resource:
 477                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 478                        uri,
 479                        ..
 480                    }),
 481                ..
 482            }) => Self::resource_link_md(&uri),
 483            acp::ContentBlock::Image(image) => Self::image_md(&image),
 484            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 485        }
 486    }
 487
 488    fn resource_link_md(uri: &str) -> String {
 489        if let Some(uri) = MentionUri::parse(uri).log_err() {
 490            uri.as_link().to_string()
 491        } else {
 492            uri.to_string()
 493        }
 494    }
 495
 496    fn image_md(_image: &acp::ImageContent) -> String {
 497        "`Image`".into()
 498    }
 499
 500    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 501        match self {
 502            ContentBlock::Empty => "",
 503            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 504            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 505        }
 506    }
 507
 508    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 509        match self {
 510            ContentBlock::Empty => None,
 511            ContentBlock::Markdown { markdown } => Some(markdown),
 512            ContentBlock::ResourceLink { .. } => None,
 513        }
 514    }
 515
 516    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 517        match self {
 518            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 519            _ => None,
 520        }
 521    }
 522}
 523
 524#[derive(Debug)]
 525pub enum ToolCallContent {
 526    ContentBlock(ContentBlock),
 527    Diff(Entity<Diff>),
 528    Terminal(Entity<Terminal>),
 529}
 530
 531impl ToolCallContent {
 532    pub fn from_acp(
 533        content: acp::ToolCallContent,
 534        language_registry: Arc<LanguageRegistry>,
 535        cx: &mut App,
 536    ) -> Self {
 537        match content {
 538            acp::ToolCallContent::Content { content } => {
 539                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 540            }
 541            acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
 542                Diff::finalized(
 543                    diff.path,
 544                    diff.old_text,
 545                    diff.new_text,
 546                    language_registry,
 547                    cx,
 548                )
 549            })),
 550        }
 551    }
 552
 553    pub fn to_markdown(&self, cx: &App) -> String {
 554        match self {
 555            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 556            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 557            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 558        }
 559    }
 560}
 561
 562#[derive(Debug, PartialEq)]
 563pub enum ToolCallUpdate {
 564    UpdateFields(acp::ToolCallUpdate),
 565    UpdateDiff(ToolCallUpdateDiff),
 566    UpdateTerminal(ToolCallUpdateTerminal),
 567}
 568
 569impl ToolCallUpdate {
 570    fn id(&self) -> &acp::ToolCallId {
 571        match self {
 572            Self::UpdateFields(update) => &update.id,
 573            Self::UpdateDiff(diff) => &diff.id,
 574            Self::UpdateTerminal(terminal) => &terminal.id,
 575        }
 576    }
 577}
 578
 579impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 580    fn from(update: acp::ToolCallUpdate) -> Self {
 581        Self::UpdateFields(update)
 582    }
 583}
 584
 585impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 586    fn from(diff: ToolCallUpdateDiff) -> Self {
 587        Self::UpdateDiff(diff)
 588    }
 589}
 590
 591#[derive(Debug, PartialEq)]
 592pub struct ToolCallUpdateDiff {
 593    pub id: acp::ToolCallId,
 594    pub diff: Entity<Diff>,
 595}
 596
 597impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 598    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 599        Self::UpdateTerminal(terminal)
 600    }
 601}
 602
 603#[derive(Debug, PartialEq)]
 604pub struct ToolCallUpdateTerminal {
 605    pub id: acp::ToolCallId,
 606    pub terminal: Entity<Terminal>,
 607}
 608
 609#[derive(Debug, Default)]
 610pub struct Plan {
 611    pub entries: Vec<PlanEntry>,
 612}
 613
 614#[derive(Debug)]
 615pub struct PlanStats<'a> {
 616    pub in_progress_entry: Option<&'a PlanEntry>,
 617    pub pending: u32,
 618    pub completed: u32,
 619}
 620
 621impl Plan {
 622    pub fn is_empty(&self) -> bool {
 623        self.entries.is_empty()
 624    }
 625
 626    pub fn stats(&self) -> PlanStats<'_> {
 627        let mut stats = PlanStats {
 628            in_progress_entry: None,
 629            pending: 0,
 630            completed: 0,
 631        };
 632
 633        for entry in &self.entries {
 634            match &entry.status {
 635                acp::PlanEntryStatus::Pending => {
 636                    stats.pending += 1;
 637                }
 638                acp::PlanEntryStatus::InProgress => {
 639                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 640                }
 641                acp::PlanEntryStatus::Completed => {
 642                    stats.completed += 1;
 643                }
 644            }
 645        }
 646
 647        stats
 648    }
 649}
 650
 651#[derive(Debug)]
 652pub struct PlanEntry {
 653    pub content: Entity<Markdown>,
 654    pub priority: acp::PlanEntryPriority,
 655    pub status: acp::PlanEntryStatus,
 656}
 657
 658impl PlanEntry {
 659    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 660        Self {
 661            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 662            priority: entry.priority,
 663            status: entry.status,
 664        }
 665    }
 666}
 667
 668#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 669pub struct TokenUsage {
 670    pub max_tokens: u64,
 671    pub used_tokens: u64,
 672}
 673
 674#[derive(Debug, Clone)]
 675pub struct RetryStatus {
 676    pub last_error: SharedString,
 677    pub attempt: usize,
 678    pub max_attempts: usize,
 679    pub started_at: Instant,
 680    pub duration: Duration,
 681}
 682
 683pub struct AcpThread {
 684    title: SharedString,
 685    entries: Vec<AgentThreadEntry>,
 686    plan: Plan,
 687    project: Entity<Project>,
 688    action_log: Entity<ActionLog>,
 689    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 690    send_task: Option<Task<()>>,
 691    connection: Rc<dyn AgentConnection>,
 692    session_id: acp::SessionId,
 693    token_usage: Option<TokenUsage>,
 694}
 695
 696#[derive(Debug)]
 697pub enum AcpThreadEvent {
 698    NewEntry,
 699    TitleUpdated,
 700    TokenUsageUpdated,
 701    EntryUpdated(usize),
 702    EntriesRemoved(Range<usize>),
 703    ToolAuthorizationRequired,
 704    Retry(RetryStatus),
 705    Stopped,
 706    Error,
 707    ServerExited(ExitStatus),
 708}
 709
 710impl EventEmitter<AcpThreadEvent> for AcpThread {}
 711
 712#[derive(PartialEq, Eq)]
 713pub enum ThreadStatus {
 714    Idle,
 715    WaitingForToolConfirmation,
 716    Generating,
 717}
 718
 719#[derive(Debug, Clone)]
 720pub enum LoadError {
 721    Unsupported {
 722        error_message: SharedString,
 723        upgrade_message: SharedString,
 724        upgrade_command: String,
 725    },
 726    Exited(i32),
 727    Other(SharedString),
 728}
 729
 730impl Display for LoadError {
 731    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 732        match self {
 733            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 734            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 735            LoadError::Other(msg) => write!(f, "{}", msg),
 736        }
 737    }
 738}
 739
 740impl Error for LoadError {}
 741
 742impl AcpThread {
 743    pub fn new(
 744        title: impl Into<SharedString>,
 745        connection: Rc<dyn AgentConnection>,
 746        project: Entity<Project>,
 747        action_log: Entity<ActionLog>,
 748        session_id: acp::SessionId,
 749    ) -> Self {
 750        Self {
 751            action_log,
 752            shared_buffers: Default::default(),
 753            entries: Default::default(),
 754            plan: Default::default(),
 755            title: title.into(),
 756            project,
 757            send_task: None,
 758            connection,
 759            session_id,
 760            token_usage: None,
 761        }
 762    }
 763
 764    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 765        &self.connection
 766    }
 767
 768    pub fn action_log(&self) -> &Entity<ActionLog> {
 769        &self.action_log
 770    }
 771
 772    pub fn project(&self) -> &Entity<Project> {
 773        &self.project
 774    }
 775
 776    pub fn title(&self) -> SharedString {
 777        self.title.clone()
 778    }
 779
 780    pub fn entries(&self) -> &[AgentThreadEntry] {
 781        &self.entries
 782    }
 783
 784    pub fn session_id(&self) -> &acp::SessionId {
 785        &self.session_id
 786    }
 787
 788    pub fn status(&self) -> ThreadStatus {
 789        if self.send_task.is_some() {
 790            if self.waiting_for_tool_confirmation() {
 791                ThreadStatus::WaitingForToolConfirmation
 792            } else {
 793                ThreadStatus::Generating
 794            }
 795        } else {
 796            ThreadStatus::Idle
 797        }
 798    }
 799
 800    pub fn token_usage(&self) -> Option<&TokenUsage> {
 801        self.token_usage.as_ref()
 802    }
 803
 804    pub fn has_pending_edit_tool_calls(&self) -> bool {
 805        for entry in self.entries.iter().rev() {
 806            match entry {
 807                AgentThreadEntry::UserMessage(_) => return false,
 808                AgentThreadEntry::ToolCall(
 809                    call @ ToolCall {
 810                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
 811                        ..
 812                    },
 813                ) if call.diffs().next().is_some() => {
 814                    return true;
 815                }
 816                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 817            }
 818        }
 819
 820        false
 821    }
 822
 823    pub fn used_tools_since_last_user_message(&self) -> bool {
 824        for entry in self.entries.iter().rev() {
 825            match entry {
 826                AgentThreadEntry::UserMessage(..) => return false,
 827                AgentThreadEntry::AssistantMessage(..) => continue,
 828                AgentThreadEntry::ToolCall(..) => return true,
 829            }
 830        }
 831
 832        false
 833    }
 834
 835    pub fn handle_session_update(
 836        &mut self,
 837        update: acp::SessionUpdate,
 838        cx: &mut Context<Self>,
 839    ) -> Result<(), acp::Error> {
 840        match update {
 841            acp::SessionUpdate::UserMessageChunk { content } => {
 842                self.push_user_content_block(None, content, cx);
 843            }
 844            acp::SessionUpdate::AgentMessageChunk { content } => {
 845                self.push_assistant_content_block(content, false, cx);
 846            }
 847            acp::SessionUpdate::AgentThoughtChunk { content } => {
 848                self.push_assistant_content_block(content, true, cx);
 849            }
 850            acp::SessionUpdate::ToolCall(tool_call) => {
 851                self.upsert_tool_call(tool_call, cx)?;
 852            }
 853            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 854                self.update_tool_call(tool_call_update, cx)?;
 855            }
 856            acp::SessionUpdate::Plan(plan) => {
 857                self.update_plan(plan, cx);
 858            }
 859        }
 860        Ok(())
 861    }
 862
 863    pub fn push_user_content_block(
 864        &mut self,
 865        message_id: Option<UserMessageId>,
 866        chunk: acp::ContentBlock,
 867        cx: &mut Context<Self>,
 868    ) {
 869        let language_registry = self.project.read(cx).languages().clone();
 870        let entries_len = self.entries.len();
 871
 872        if let Some(last_entry) = self.entries.last_mut()
 873            && let AgentThreadEntry::UserMessage(UserMessage {
 874                id,
 875                content,
 876                chunks,
 877                ..
 878            }) = last_entry
 879        {
 880            *id = message_id.or(id.take());
 881            content.append(chunk.clone(), &language_registry, cx);
 882            chunks.push(chunk);
 883            let idx = entries_len - 1;
 884            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 885        } else {
 886            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 887            self.push_entry(
 888                AgentThreadEntry::UserMessage(UserMessage {
 889                    id: message_id,
 890                    content,
 891                    chunks: vec![chunk],
 892                    checkpoint: None,
 893                }),
 894                cx,
 895            );
 896        }
 897    }
 898
 899    pub fn push_assistant_content_block(
 900        &mut self,
 901        chunk: acp::ContentBlock,
 902        is_thought: bool,
 903        cx: &mut Context<Self>,
 904    ) {
 905        let language_registry = self.project.read(cx).languages().clone();
 906        let entries_len = self.entries.len();
 907        if let Some(last_entry) = self.entries.last_mut()
 908            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 909        {
 910            let idx = entries_len - 1;
 911            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 912            match (chunks.last_mut(), is_thought) {
 913                (Some(AssistantMessageChunk::Message { block }), false)
 914                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 915                    block.append(chunk, &language_registry, cx)
 916                }
 917                _ => {
 918                    let block = ContentBlock::new(chunk, &language_registry, cx);
 919                    if is_thought {
 920                        chunks.push(AssistantMessageChunk::Thought { block })
 921                    } else {
 922                        chunks.push(AssistantMessageChunk::Message { block })
 923                    }
 924                }
 925            }
 926        } else {
 927            let block = ContentBlock::new(chunk, &language_registry, cx);
 928            let chunk = if is_thought {
 929                AssistantMessageChunk::Thought { block }
 930            } else {
 931                AssistantMessageChunk::Message { block }
 932            };
 933
 934            self.push_entry(
 935                AgentThreadEntry::AssistantMessage(AssistantMessage {
 936                    chunks: vec![chunk],
 937                }),
 938                cx,
 939            );
 940        }
 941    }
 942
 943    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 944        self.entries.push(entry);
 945        cx.emit(AcpThreadEvent::NewEntry);
 946    }
 947
 948    pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
 949        self.title = title;
 950        cx.emit(AcpThreadEvent::TitleUpdated);
 951        Ok(())
 952    }
 953
 954    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
 955        self.token_usage = usage;
 956        cx.emit(AcpThreadEvent::TokenUsageUpdated);
 957    }
 958
 959    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
 960        cx.emit(AcpThreadEvent::Retry(status));
 961    }
 962
 963    pub fn update_tool_call(
 964        &mut self,
 965        update: impl Into<ToolCallUpdate>,
 966        cx: &mut Context<Self>,
 967    ) -> Result<()> {
 968        let update = update.into();
 969        let languages = self.project.read(cx).languages().clone();
 970
 971        let (ix, current_call) = self
 972            .tool_call_mut(update.id())
 973            .context("Tool call not found")?;
 974        match update {
 975            ToolCallUpdate::UpdateFields(update) => {
 976                let location_updated = update.fields.locations.is_some();
 977                current_call.update_fields(update.fields, languages, cx);
 978                if location_updated {
 979                    self.resolve_locations(update.id.clone(), cx);
 980                }
 981            }
 982            ToolCallUpdate::UpdateDiff(update) => {
 983                current_call.content.clear();
 984                current_call
 985                    .content
 986                    .push(ToolCallContent::Diff(update.diff));
 987            }
 988            ToolCallUpdate::UpdateTerminal(update) => {
 989                current_call.content.clear();
 990                current_call
 991                    .content
 992                    .push(ToolCallContent::Terminal(update.terminal));
 993            }
 994        }
 995
 996        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 997
 998        Ok(())
 999    }
1000
1001    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1002    pub fn upsert_tool_call(
1003        &mut self,
1004        tool_call: acp::ToolCall,
1005        cx: &mut Context<Self>,
1006    ) -> Result<(), acp::Error> {
1007        let status = tool_call.status.into();
1008        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1009    }
1010
1011    /// Fails if id does not match an existing entry.
1012    pub fn upsert_tool_call_inner(
1013        &mut self,
1014        tool_call_update: acp::ToolCallUpdate,
1015        status: ToolCallStatus,
1016        cx: &mut Context<Self>,
1017    ) -> Result<(), acp::Error> {
1018        let language_registry = self.project.read(cx).languages().clone();
1019        let id = tool_call_update.id.clone();
1020
1021        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1022            current_call.update_fields(tool_call_update.fields, language_registry, cx);
1023            current_call.status = status;
1024
1025            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1026        } else {
1027            let call =
1028                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1029            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1030        };
1031
1032        self.resolve_locations(id, cx);
1033        Ok(())
1034    }
1035
1036    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1037        // The tool call we are looking for is typically the last one, or very close to the end.
1038        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1039        self.entries
1040            .iter_mut()
1041            .enumerate()
1042            .rev()
1043            .find_map(|(index, tool_call)| {
1044                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1045                    && &tool_call.id == id
1046                {
1047                    Some((index, tool_call))
1048                } else {
1049                    None
1050                }
1051            })
1052    }
1053
1054    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1055        let project = self.project.clone();
1056        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1057            return;
1058        };
1059        let task = tool_call.resolve_locations(project, cx);
1060        cx.spawn(async move |this, cx| {
1061            let resolved_locations = task.await;
1062            this.update(cx, |this, cx| {
1063                let project = this.project.clone();
1064                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1065                    return;
1066                };
1067                if let Some(Some(location)) = resolved_locations.last() {
1068                    project.update(cx, |project, cx| {
1069                        if let Some(agent_location) = project.agent_location() {
1070                            let should_ignore = agent_location.buffer == location.buffer
1071                                && location
1072                                    .buffer
1073                                    .update(cx, |buffer, _| {
1074                                        let snapshot = buffer.snapshot();
1075                                        let old_position =
1076                                            agent_location.position.to_point(&snapshot);
1077                                        let new_position = location.position.to_point(&snapshot);
1078                                        // ignore this so that when we get updates from the edit tool
1079                                        // the position doesn't reset to the startof line
1080                                        old_position.row == new_position.row
1081                                            && old_position.column > new_position.column
1082                                    })
1083                                    .ok()
1084                                    .unwrap_or_default();
1085                            if !should_ignore {
1086                                project.set_agent_location(Some(location.clone()), cx);
1087                            }
1088                        }
1089                    });
1090                }
1091                if tool_call.resolved_locations != resolved_locations {
1092                    tool_call.resolved_locations = resolved_locations;
1093                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1094                }
1095            })
1096        })
1097        .detach();
1098    }
1099
1100    pub fn request_tool_call_authorization(
1101        &mut self,
1102        tool_call: acp::ToolCallUpdate,
1103        options: Vec<acp::PermissionOption>,
1104        cx: &mut Context<Self>,
1105    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1106        let (tx, rx) = oneshot::channel();
1107
1108        let status = ToolCallStatus::WaitingForConfirmation {
1109            options,
1110            respond_tx: tx,
1111        };
1112
1113        self.upsert_tool_call_inner(tool_call, status, cx)?;
1114        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1115        Ok(rx)
1116    }
1117
1118    pub fn authorize_tool_call(
1119        &mut self,
1120        id: acp::ToolCallId,
1121        option_id: acp::PermissionOptionId,
1122        option_kind: acp::PermissionOptionKind,
1123        cx: &mut Context<Self>,
1124    ) {
1125        let Some((ix, call)) = self.tool_call_mut(&id) else {
1126            return;
1127        };
1128
1129        let new_status = match option_kind {
1130            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1131                ToolCallStatus::Rejected
1132            }
1133            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1134                ToolCallStatus::InProgress
1135            }
1136        };
1137
1138        let curr_status = mem::replace(&mut call.status, new_status);
1139
1140        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1141            respond_tx.send(option_id).log_err();
1142        } else if cfg!(debug_assertions) {
1143            panic!("tried to authorize an already authorized tool call");
1144        }
1145
1146        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1147    }
1148
1149    /// Returns true if the last turn is awaiting tool authorization
1150    pub fn waiting_for_tool_confirmation(&self) -> bool {
1151        for entry in self.entries.iter().rev() {
1152            match &entry {
1153                AgentThreadEntry::ToolCall(call) => match call.status {
1154                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1155                    ToolCallStatus::Pending
1156                    | ToolCallStatus::InProgress
1157                    | ToolCallStatus::Completed
1158                    | ToolCallStatus::Failed
1159                    | ToolCallStatus::Rejected
1160                    | ToolCallStatus::Canceled => continue,
1161                },
1162                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1163                    // Reached the beginning of the turn
1164                    return false;
1165                }
1166            }
1167        }
1168        false
1169    }
1170
1171    pub fn plan(&self) -> &Plan {
1172        &self.plan
1173    }
1174
1175    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1176        let new_entries_len = request.entries.len();
1177        let mut new_entries = request.entries.into_iter();
1178
1179        // Reuse existing markdown to prevent flickering
1180        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1181            let PlanEntry {
1182                content,
1183                priority,
1184                status,
1185            } = old;
1186            content.update(cx, |old, cx| {
1187                old.replace(new.content, cx);
1188            });
1189            *priority = new.priority;
1190            *status = new.status;
1191        }
1192        for new in new_entries {
1193            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1194        }
1195        self.plan.entries.truncate(new_entries_len);
1196
1197        cx.notify();
1198    }
1199
1200    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1201        self.plan
1202            .entries
1203            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1204        cx.notify();
1205    }
1206
1207    #[cfg(any(test, feature = "test-support"))]
1208    pub fn send_raw(
1209        &mut self,
1210        message: &str,
1211        cx: &mut Context<Self>,
1212    ) -> BoxFuture<'static, Result<()>> {
1213        self.send(
1214            vec![acp::ContentBlock::Text(acp::TextContent {
1215                text: message.to_string(),
1216                annotations: None,
1217            })],
1218            cx,
1219        )
1220    }
1221
1222    pub fn send(
1223        &mut self,
1224        message: Vec<acp::ContentBlock>,
1225        cx: &mut Context<Self>,
1226    ) -> BoxFuture<'static, Result<()>> {
1227        let block = ContentBlock::new_combined(
1228            message.clone(),
1229            self.project.read(cx).languages().clone(),
1230            cx,
1231        );
1232        let request = acp::PromptRequest {
1233            prompt: message.clone(),
1234            session_id: self.session_id.clone(),
1235        };
1236        let git_store = self.project.read(cx).git_store().clone();
1237
1238        let message_id = if self
1239            .connection
1240            .session_editor(&self.session_id, cx)
1241            .is_some()
1242        {
1243            Some(UserMessageId::new())
1244        } else {
1245            None
1246        };
1247
1248        self.run_turn(cx, async move |this, cx| {
1249            this.update(cx, |this, cx| {
1250                this.push_entry(
1251                    AgentThreadEntry::UserMessage(UserMessage {
1252                        id: message_id.clone(),
1253                        content: block,
1254                        chunks: message,
1255                        checkpoint: None,
1256                    }),
1257                    cx,
1258                );
1259            })
1260            .ok();
1261
1262            let old_checkpoint = git_store
1263                .update(cx, |git, cx| git.checkpoint(cx))?
1264                .await
1265                .context("failed to get old checkpoint")
1266                .log_err();
1267            this.update(cx, |this, cx| {
1268                if let Some((_ix, message)) = this.last_user_message() {
1269                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1270                        git_checkpoint,
1271                        show: false,
1272                    });
1273                }
1274                this.connection.prompt(message_id, request, cx)
1275            })?
1276            .await
1277        })
1278    }
1279
1280    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1281        self.run_turn(cx, async move |this, cx| {
1282            this.update(cx, |this, cx| {
1283                this.connection
1284                    .resume(&this.session_id, cx)
1285                    .map(|resume| resume.run(cx))
1286            })?
1287            .context("resuming a session is not supported")?
1288            .await
1289        })
1290    }
1291
1292    fn run_turn(
1293        &mut self,
1294        cx: &mut Context<Self>,
1295        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1296    ) -> BoxFuture<'static, Result<()>> {
1297        self.clear_completed_plan_entries(cx);
1298
1299        let (tx, rx) = oneshot::channel();
1300        let cancel_task = self.cancel(cx);
1301
1302        self.send_task = Some(cx.spawn(async move |this, cx| {
1303            cancel_task.await;
1304            tx.send(f(this, cx).await).ok();
1305        }));
1306
1307        cx.spawn(async move |this, cx| {
1308            let response = rx.await;
1309
1310            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1311                .await?;
1312
1313            this.update(cx, |this, cx| {
1314                this.project
1315                    .update(cx, |project, cx| project.set_agent_location(None, cx));
1316                match response {
1317                    Ok(Err(e)) => {
1318                        this.send_task.take();
1319                        cx.emit(AcpThreadEvent::Error);
1320                        Err(e)
1321                    }
1322                    result => {
1323                        let canceled = matches!(
1324                            result,
1325                            Ok(Ok(acp::PromptResponse {
1326                                stop_reason: acp::StopReason::Canceled
1327                            }))
1328                        );
1329
1330                        // We only take the task if the current prompt wasn't canceled.
1331                        //
1332                        // This prompt may have been canceled because another one was sent
1333                        // while it was still generating. In these cases, dropping `send_task`
1334                        // would cause the next generation to be canceled.
1335                        if !canceled {
1336                            this.send_task.take();
1337                        }
1338
1339                        cx.emit(AcpThreadEvent::Stopped);
1340                        Ok(())
1341                    }
1342                }
1343            })?
1344        })
1345        .boxed()
1346    }
1347
1348    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1349        let Some(send_task) = self.send_task.take() else {
1350            return Task::ready(());
1351        };
1352
1353        for entry in self.entries.iter_mut() {
1354            if let AgentThreadEntry::ToolCall(call) = entry {
1355                let cancel = matches!(
1356                    call.status,
1357                    ToolCallStatus::Pending
1358                        | ToolCallStatus::WaitingForConfirmation { .. }
1359                        | ToolCallStatus::InProgress
1360                );
1361
1362                if cancel {
1363                    call.status = ToolCallStatus::Canceled;
1364                }
1365            }
1366        }
1367
1368        self.connection.cancel(&self.session_id, cx);
1369
1370        // Wait for the send task to complete
1371        cx.foreground_executor().spawn(send_task)
1372    }
1373
1374    /// Rewinds this thread to before the entry at `index`, removing it and all
1375    /// subsequent entries while reverting any changes made from that point.
1376    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1377        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1378            return Task::ready(Err(anyhow!("not supported")));
1379        };
1380        let Some(message) = self.user_message(&id) else {
1381            return Task::ready(Err(anyhow!("message not found")));
1382        };
1383
1384        let checkpoint = message
1385            .checkpoint
1386            .as_ref()
1387            .map(|c| c.git_checkpoint.clone());
1388
1389        let git_store = self.project.read(cx).git_store().clone();
1390        cx.spawn(async move |this, cx| {
1391            if let Some(checkpoint) = checkpoint {
1392                git_store
1393                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1394                    .await?;
1395            }
1396
1397            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1398                .await?;
1399            this.update(cx, |this, cx| {
1400                if let Some((ix, _)) = this.user_message_mut(&id) {
1401                    let range = ix..this.entries.len();
1402                    this.entries.truncate(ix);
1403                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1404                }
1405            })
1406        })
1407    }
1408
1409    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1410        let git_store = self.project.read(cx).git_store().clone();
1411
1412        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1413            if let Some(checkpoint) = message.checkpoint.as_ref() {
1414                checkpoint.git_checkpoint.clone()
1415            } else {
1416                return Task::ready(Ok(()));
1417            }
1418        } else {
1419            return Task::ready(Ok(()));
1420        };
1421
1422        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1423        cx.spawn(async move |this, cx| {
1424            let new_checkpoint = new_checkpoint
1425                .await
1426                .context("failed to get new checkpoint")
1427                .log_err();
1428            if let Some(new_checkpoint) = new_checkpoint {
1429                let equal = git_store
1430                    .update(cx, |git, cx| {
1431                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1432                    })?
1433                    .await
1434                    .unwrap_or(true);
1435                this.update(cx, |this, cx| {
1436                    let (ix, message) = this.last_user_message().context("no user message")?;
1437                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1438                    checkpoint.show = !equal;
1439                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1440                    anyhow::Ok(())
1441                })??;
1442            }
1443
1444            Ok(())
1445        })
1446    }
1447
1448    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1449        self.entries
1450            .iter_mut()
1451            .enumerate()
1452            .rev()
1453            .find_map(|(ix, entry)| {
1454                if let AgentThreadEntry::UserMessage(message) = entry {
1455                    Some((ix, message))
1456                } else {
1457                    None
1458                }
1459            })
1460    }
1461
1462    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1463        self.entries.iter().find_map(|entry| {
1464            if let AgentThreadEntry::UserMessage(message) = entry {
1465                if message.id.as_ref() == Some(id) {
1466                    Some(message)
1467                } else {
1468                    None
1469                }
1470            } else {
1471                None
1472            }
1473        })
1474    }
1475
1476    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1477        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1478            if let AgentThreadEntry::UserMessage(message) = entry {
1479                if message.id.as_ref() == Some(id) {
1480                    Some((ix, message))
1481                } else {
1482                    None
1483                }
1484            } else {
1485                None
1486            }
1487        })
1488    }
1489
1490    pub fn read_text_file(
1491        &self,
1492        path: PathBuf,
1493        line: Option<u32>,
1494        limit: Option<u32>,
1495        reuse_shared_snapshot: bool,
1496        cx: &mut Context<Self>,
1497    ) -> Task<Result<String>> {
1498        let project = self.project.clone();
1499        let action_log = self.action_log.clone();
1500        cx.spawn(async move |this, cx| {
1501            let load = project.update(cx, |project, cx| {
1502                let path = project
1503                    .project_path_for_absolute_path(&path, cx)
1504                    .context("invalid path")?;
1505                anyhow::Ok(project.open_buffer(path, cx))
1506            });
1507            let buffer = load??.await?;
1508
1509            let snapshot = if reuse_shared_snapshot {
1510                this.read_with(cx, |this, _| {
1511                    this.shared_buffers.get(&buffer.clone()).cloned()
1512                })
1513                .log_err()
1514                .flatten()
1515            } else {
1516                None
1517            };
1518
1519            let snapshot = if let Some(snapshot) = snapshot {
1520                snapshot
1521            } else {
1522                action_log.update(cx, |action_log, cx| {
1523                    action_log.buffer_read(buffer.clone(), cx);
1524                })?;
1525                project.update(cx, |project, cx| {
1526                    let position = buffer
1527                        .read(cx)
1528                        .snapshot()
1529                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1530                    project.set_agent_location(
1531                        Some(AgentLocation {
1532                            buffer: buffer.downgrade(),
1533                            position,
1534                        }),
1535                        cx,
1536                    );
1537                })?;
1538
1539                buffer.update(cx, |buffer, _| buffer.snapshot())?
1540            };
1541
1542            this.update(cx, |this, _| {
1543                let text = snapshot.text();
1544                this.shared_buffers.insert(buffer.clone(), snapshot);
1545                if line.is_none() && limit.is_none() {
1546                    return Ok(text);
1547                }
1548                let limit = limit.unwrap_or(u32::MAX) as usize;
1549                let Some(line) = line else {
1550                    return Ok(text.lines().take(limit).collect::<String>());
1551                };
1552
1553                let count = text.lines().count();
1554                if count < line as usize {
1555                    anyhow::bail!("There are only {} lines", count);
1556                }
1557                Ok(text
1558                    .lines()
1559                    .skip(line as usize + 1)
1560                    .take(limit)
1561                    .collect::<String>())
1562            })?
1563        })
1564    }
1565
1566    pub fn write_text_file(
1567        &self,
1568        path: PathBuf,
1569        content: String,
1570        cx: &mut Context<Self>,
1571    ) -> Task<Result<()>> {
1572        let project = self.project.clone();
1573        let action_log = self.action_log.clone();
1574        cx.spawn(async move |this, cx| {
1575            let load = project.update(cx, |project, cx| {
1576                let path = project
1577                    .project_path_for_absolute_path(&path, cx)
1578                    .context("invalid path")?;
1579                anyhow::Ok(project.open_buffer(path, cx))
1580            });
1581            let buffer = load??.await?;
1582            let snapshot = this.update(cx, |this, cx| {
1583                this.shared_buffers
1584                    .get(&buffer)
1585                    .cloned()
1586                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1587            })?;
1588            let edits = cx
1589                .background_executor()
1590                .spawn(async move {
1591                    let old_text = snapshot.text();
1592                    text_diff(old_text.as_str(), &content)
1593                        .into_iter()
1594                        .map(|(range, replacement)| {
1595                            (
1596                                snapshot.anchor_after(range.start)
1597                                    ..snapshot.anchor_before(range.end),
1598                                replacement,
1599                            )
1600                        })
1601                        .collect::<Vec<_>>()
1602                })
1603                .await;
1604            cx.update(|cx| {
1605                project.update(cx, |project, cx| {
1606                    project.set_agent_location(
1607                        Some(AgentLocation {
1608                            buffer: buffer.downgrade(),
1609                            position: edits
1610                                .last()
1611                                .map(|(range, _)| range.end)
1612                                .unwrap_or(Anchor::MIN),
1613                        }),
1614                        cx,
1615                    );
1616                });
1617
1618                action_log.update(cx, |action_log, cx| {
1619                    action_log.buffer_read(buffer.clone(), cx);
1620                });
1621                buffer.update(cx, |buffer, cx| {
1622                    buffer.edit(edits, None, cx);
1623                });
1624                action_log.update(cx, |action_log, cx| {
1625                    action_log.buffer_edited(buffer.clone(), cx);
1626                });
1627            })?;
1628            project
1629                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1630                .await
1631        })
1632    }
1633
1634    pub fn to_markdown(&self, cx: &App) -> String {
1635        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1636    }
1637
1638    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1639        cx.emit(AcpThreadEvent::ServerExited(status));
1640    }
1641}
1642
1643fn markdown_for_raw_output(
1644    raw_output: &serde_json::Value,
1645    language_registry: &Arc<LanguageRegistry>,
1646    cx: &mut App,
1647) -> Option<Entity<Markdown>> {
1648    match raw_output {
1649        serde_json::Value::Null => None,
1650        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1651            Markdown::new(
1652                value.to_string().into(),
1653                Some(language_registry.clone()),
1654                None,
1655                cx,
1656            )
1657        })),
1658        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1659            Markdown::new(
1660                value.to_string().into(),
1661                Some(language_registry.clone()),
1662                None,
1663                cx,
1664            )
1665        })),
1666        serde_json::Value::String(value) => Some(cx.new(|cx| {
1667            Markdown::new(
1668                value.clone().into(),
1669                Some(language_registry.clone()),
1670                None,
1671                cx,
1672            )
1673        })),
1674        value => Some(cx.new(|cx| {
1675            Markdown::new(
1676                format!("```json\n{}\n```", value).into(),
1677                Some(language_registry.clone()),
1678                None,
1679                cx,
1680            )
1681        })),
1682    }
1683}
1684
1685#[cfg(test)]
1686mod tests {
1687    use super::*;
1688    use anyhow::anyhow;
1689    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1690    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1691    use indoc::indoc;
1692    use project::{FakeFs, Fs};
1693    use rand::Rng as _;
1694    use serde_json::json;
1695    use settings::SettingsStore;
1696    use smol::stream::StreamExt as _;
1697    use std::{
1698        any::Any,
1699        cell::RefCell,
1700        path::Path,
1701        rc::Rc,
1702        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1703        time::Duration,
1704    };
1705    use util::path;
1706
1707    fn init_test(cx: &mut TestAppContext) {
1708        env_logger::try_init().ok();
1709        cx.update(|cx| {
1710            let settings_store = SettingsStore::test(cx);
1711            cx.set_global(settings_store);
1712            Project::init_settings(cx);
1713            language::init(cx);
1714        });
1715    }
1716
1717    #[gpui::test]
1718    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1719        init_test(cx);
1720
1721        let fs = FakeFs::new(cx.executor());
1722        let project = Project::test(fs, [], cx).await;
1723        let connection = Rc::new(FakeAgentConnection::new());
1724        let thread = cx
1725            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1726            .await
1727            .unwrap();
1728
1729        // Test creating a new user message
1730        thread.update(cx, |thread, cx| {
1731            thread.push_user_content_block(
1732                None,
1733                acp::ContentBlock::Text(acp::TextContent {
1734                    annotations: None,
1735                    text: "Hello, ".to_string(),
1736                }),
1737                cx,
1738            );
1739        });
1740
1741        thread.update(cx, |thread, cx| {
1742            assert_eq!(thread.entries.len(), 1);
1743            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1744                assert_eq!(user_msg.id, None);
1745                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1746            } else {
1747                panic!("Expected UserMessage");
1748            }
1749        });
1750
1751        // Test appending to existing user message
1752        let message_1_id = UserMessageId::new();
1753        thread.update(cx, |thread, cx| {
1754            thread.push_user_content_block(
1755                Some(message_1_id.clone()),
1756                acp::ContentBlock::Text(acp::TextContent {
1757                    annotations: None,
1758                    text: "world!".to_string(),
1759                }),
1760                cx,
1761            );
1762        });
1763
1764        thread.update(cx, |thread, cx| {
1765            assert_eq!(thread.entries.len(), 1);
1766            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1767                assert_eq!(user_msg.id, Some(message_1_id));
1768                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1769            } else {
1770                panic!("Expected UserMessage");
1771            }
1772        });
1773
1774        // Test creating new user message after assistant message
1775        thread.update(cx, |thread, cx| {
1776            thread.push_assistant_content_block(
1777                acp::ContentBlock::Text(acp::TextContent {
1778                    annotations: None,
1779                    text: "Assistant response".to_string(),
1780                }),
1781                false,
1782                cx,
1783            );
1784        });
1785
1786        let message_2_id = UserMessageId::new();
1787        thread.update(cx, |thread, cx| {
1788            thread.push_user_content_block(
1789                Some(message_2_id.clone()),
1790                acp::ContentBlock::Text(acp::TextContent {
1791                    annotations: None,
1792                    text: "New user message".to_string(),
1793                }),
1794                cx,
1795            );
1796        });
1797
1798        thread.update(cx, |thread, cx| {
1799            assert_eq!(thread.entries.len(), 3);
1800            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1801                assert_eq!(user_msg.id, Some(message_2_id));
1802                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1803            } else {
1804                panic!("Expected UserMessage at index 2");
1805            }
1806        });
1807    }
1808
1809    #[gpui::test]
1810    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1811        init_test(cx);
1812
1813        let fs = FakeFs::new(cx.executor());
1814        let project = Project::test(fs, [], cx).await;
1815        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1816            |_, thread, mut cx| {
1817                async move {
1818                    thread.update(&mut cx, |thread, cx| {
1819                        thread
1820                            .handle_session_update(
1821                                acp::SessionUpdate::AgentThoughtChunk {
1822                                    content: "Thinking ".into(),
1823                                },
1824                                cx,
1825                            )
1826                            .unwrap();
1827                        thread
1828                            .handle_session_update(
1829                                acp::SessionUpdate::AgentThoughtChunk {
1830                                    content: "hard!".into(),
1831                                },
1832                                cx,
1833                            )
1834                            .unwrap();
1835                    })?;
1836                    Ok(acp::PromptResponse {
1837                        stop_reason: acp::StopReason::EndTurn,
1838                    })
1839                }
1840                .boxed_local()
1841            },
1842        ));
1843
1844        let thread = cx
1845            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1846            .await
1847            .unwrap();
1848
1849        thread
1850            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1851            .await
1852            .unwrap();
1853
1854        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1855        assert_eq!(
1856            output,
1857            indoc! {r#"
1858            ## User
1859
1860            Hello from Zed!
1861
1862            ## Assistant
1863
1864            <thinking>
1865            Thinking hard!
1866            </thinking>
1867
1868            "#}
1869        );
1870    }
1871
1872    #[gpui::test]
1873    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1874        init_test(cx);
1875
1876        let fs = FakeFs::new(cx.executor());
1877        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1878            .await;
1879        let project = Project::test(fs.clone(), [], cx).await;
1880        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1881        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1882        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1883            move |_, thread, mut cx| {
1884                let read_file_tx = read_file_tx.clone();
1885                async move {
1886                    let content = thread
1887                        .update(&mut cx, |thread, cx| {
1888                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1889                        })
1890                        .unwrap()
1891                        .await
1892                        .unwrap();
1893                    assert_eq!(content, "one\ntwo\nthree\n");
1894                    read_file_tx.take().unwrap().send(()).unwrap();
1895                    thread
1896                        .update(&mut cx, |thread, cx| {
1897                            thread.write_text_file(
1898                                path!("/tmp/foo").into(),
1899                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1900                                cx,
1901                            )
1902                        })
1903                        .unwrap()
1904                        .await
1905                        .unwrap();
1906                    Ok(acp::PromptResponse {
1907                        stop_reason: acp::StopReason::EndTurn,
1908                    })
1909                }
1910                .boxed_local()
1911            },
1912        ));
1913
1914        let (worktree, pathbuf) = project
1915            .update(cx, |project, cx| {
1916                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1917            })
1918            .await
1919            .unwrap();
1920        let buffer = project
1921            .update(cx, |project, cx| {
1922                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1923            })
1924            .await
1925            .unwrap();
1926
1927        let thread = cx
1928            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1929            .await
1930            .unwrap();
1931
1932        let request = thread.update(cx, |thread, cx| {
1933            thread.send_raw("Extend the count in /tmp/foo", cx)
1934        });
1935        read_file_rx.await.ok();
1936        buffer.update(cx, |buffer, cx| {
1937            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1938        });
1939        cx.run_until_parked();
1940        assert_eq!(
1941            buffer.read_with(cx, |buffer, _| buffer.text()),
1942            "zero\none\ntwo\nthree\nfour\nfive\n"
1943        );
1944        assert_eq!(
1945            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1946            "zero\none\ntwo\nthree\nfour\nfive\n"
1947        );
1948        request.await.unwrap();
1949    }
1950
1951    #[gpui::test]
1952    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1953        init_test(cx);
1954
1955        let fs = FakeFs::new(cx.executor());
1956        let project = Project::test(fs, [], cx).await;
1957        let id = acp::ToolCallId("test".into());
1958
1959        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1960            let id = id.clone();
1961            move |_, thread, mut cx| {
1962                let id = id.clone();
1963                async move {
1964                    thread
1965                        .update(&mut cx, |thread, cx| {
1966                            thread.handle_session_update(
1967                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1968                                    id: id.clone(),
1969                                    title: "Label".into(),
1970                                    kind: acp::ToolKind::Fetch,
1971                                    status: acp::ToolCallStatus::InProgress,
1972                                    content: vec![],
1973                                    locations: vec![],
1974                                    raw_input: None,
1975                                    raw_output: None,
1976                                }),
1977                                cx,
1978                            )
1979                        })
1980                        .unwrap()
1981                        .unwrap();
1982                    Ok(acp::PromptResponse {
1983                        stop_reason: acp::StopReason::EndTurn,
1984                    })
1985                }
1986                .boxed_local()
1987            }
1988        }));
1989
1990        let thread = cx
1991            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1992            .await
1993            .unwrap();
1994
1995        let request = thread.update(cx, |thread, cx| {
1996            thread.send_raw("Fetch https://example.com", cx)
1997        });
1998
1999        run_until_first_tool_call(&thread, cx).await;
2000
2001        thread.read_with(cx, |thread, _| {
2002            assert!(matches!(
2003                thread.entries[1],
2004                AgentThreadEntry::ToolCall(ToolCall {
2005                    status: ToolCallStatus::InProgress,
2006                    ..
2007                })
2008            ));
2009        });
2010
2011        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2012
2013        thread.read_with(cx, |thread, _| {
2014            assert!(matches!(
2015                &thread.entries[1],
2016                AgentThreadEntry::ToolCall(ToolCall {
2017                    status: ToolCallStatus::Canceled,
2018                    ..
2019                })
2020            ));
2021        });
2022
2023        thread
2024            .update(cx, |thread, cx| {
2025                thread.handle_session_update(
2026                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2027                        id,
2028                        fields: acp::ToolCallUpdateFields {
2029                            status: Some(acp::ToolCallStatus::Completed),
2030                            ..Default::default()
2031                        },
2032                    }),
2033                    cx,
2034                )
2035            })
2036            .unwrap();
2037
2038        request.await.unwrap();
2039
2040        thread.read_with(cx, |thread, _| {
2041            assert!(matches!(
2042                thread.entries[1],
2043                AgentThreadEntry::ToolCall(ToolCall {
2044                    status: ToolCallStatus::Completed,
2045                    ..
2046                })
2047            ));
2048        });
2049    }
2050
2051    #[gpui::test]
2052    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2053        init_test(cx);
2054        let fs = FakeFs::new(cx.background_executor.clone());
2055        fs.insert_tree(path!("/test"), json!({})).await;
2056        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2057
2058        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2059            move |_, thread, mut cx| {
2060                async move {
2061                    thread
2062                        .update(&mut cx, |thread, cx| {
2063                            thread.handle_session_update(
2064                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2065                                    id: acp::ToolCallId("test".into()),
2066                                    title: "Label".into(),
2067                                    kind: acp::ToolKind::Edit,
2068                                    status: acp::ToolCallStatus::Completed,
2069                                    content: vec![acp::ToolCallContent::Diff {
2070                                        diff: acp::Diff {
2071                                            path: "/test/test.txt".into(),
2072                                            old_text: None,
2073                                            new_text: "foo".into(),
2074                                        },
2075                                    }],
2076                                    locations: vec![],
2077                                    raw_input: None,
2078                                    raw_output: None,
2079                                }),
2080                                cx,
2081                            )
2082                        })
2083                        .unwrap()
2084                        .unwrap();
2085                    Ok(acp::PromptResponse {
2086                        stop_reason: acp::StopReason::EndTurn,
2087                    })
2088                }
2089                .boxed_local()
2090            }
2091        }));
2092
2093        let thread = cx
2094            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2095            .await
2096            .unwrap();
2097
2098        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2099            .await
2100            .unwrap();
2101
2102        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2103    }
2104
2105    #[gpui::test(iterations = 10)]
2106    async fn test_checkpoints(cx: &mut TestAppContext) {
2107        init_test(cx);
2108        let fs = FakeFs::new(cx.background_executor.clone());
2109        fs.insert_tree(
2110            path!("/test"),
2111            json!({
2112                ".git": {}
2113            }),
2114        )
2115        .await;
2116        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2117
2118        let simulate_changes = Arc::new(AtomicBool::new(true));
2119        let next_filename = Arc::new(AtomicUsize::new(0));
2120        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2121            let simulate_changes = simulate_changes.clone();
2122            let next_filename = next_filename.clone();
2123            let fs = fs.clone();
2124            move |request, thread, mut cx| {
2125                let fs = fs.clone();
2126                let simulate_changes = simulate_changes.clone();
2127                let next_filename = next_filename.clone();
2128                async move {
2129                    if simulate_changes.load(SeqCst) {
2130                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2131                        fs.write(Path::new(&filename), b"").await?;
2132                    }
2133
2134                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2135                        panic!("expected text content block");
2136                    };
2137                    thread.update(&mut cx, |thread, cx| {
2138                        thread
2139                            .handle_session_update(
2140                                acp::SessionUpdate::AgentMessageChunk {
2141                                    content: content.text.to_uppercase().into(),
2142                                },
2143                                cx,
2144                            )
2145                            .unwrap();
2146                    })?;
2147                    Ok(acp::PromptResponse {
2148                        stop_reason: acp::StopReason::EndTurn,
2149                    })
2150                }
2151                .boxed_local()
2152            }
2153        }));
2154        let thread = cx
2155            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2156            .await
2157            .unwrap();
2158
2159        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2160            .await
2161            .unwrap();
2162        thread.read_with(cx, |thread, cx| {
2163            assert_eq!(
2164                thread.to_markdown(cx),
2165                indoc! {"
2166                    ## User (checkpoint)
2167
2168                    Lorem
2169
2170                    ## Assistant
2171
2172                    LOREM
2173
2174                "}
2175            );
2176        });
2177        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2178
2179        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2180            .await
2181            .unwrap();
2182        thread.read_with(cx, |thread, cx| {
2183            assert_eq!(
2184                thread.to_markdown(cx),
2185                indoc! {"
2186                    ## User (checkpoint)
2187
2188                    Lorem
2189
2190                    ## Assistant
2191
2192                    LOREM
2193
2194                    ## User (checkpoint)
2195
2196                    ipsum
2197
2198                    ## Assistant
2199
2200                    IPSUM
2201
2202                "}
2203            );
2204        });
2205        assert_eq!(
2206            fs.files(),
2207            vec![
2208                Path::new(path!("/test/file-0")),
2209                Path::new(path!("/test/file-1"))
2210            ]
2211        );
2212
2213        // Checkpoint isn't stored when there are no changes.
2214        simulate_changes.store(false, SeqCst);
2215        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2216            .await
2217            .unwrap();
2218        thread.read_with(cx, |thread, cx| {
2219            assert_eq!(
2220                thread.to_markdown(cx),
2221                indoc! {"
2222                    ## User (checkpoint)
2223
2224                    Lorem
2225
2226                    ## Assistant
2227
2228                    LOREM
2229
2230                    ## User (checkpoint)
2231
2232                    ipsum
2233
2234                    ## Assistant
2235
2236                    IPSUM
2237
2238                    ## User
2239
2240                    dolor
2241
2242                    ## Assistant
2243
2244                    DOLOR
2245
2246                "}
2247            );
2248        });
2249        assert_eq!(
2250            fs.files(),
2251            vec![
2252                Path::new(path!("/test/file-0")),
2253                Path::new(path!("/test/file-1"))
2254            ]
2255        );
2256
2257        // Rewinding the conversation truncates the history and restores the checkpoint.
2258        thread
2259            .update(cx, |thread, cx| {
2260                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2261                    panic!("unexpected entries {:?}", thread.entries)
2262                };
2263                thread.rewind(message.id.clone().unwrap(), cx)
2264            })
2265            .await
2266            .unwrap();
2267        thread.read_with(cx, |thread, cx| {
2268            assert_eq!(
2269                thread.to_markdown(cx),
2270                indoc! {"
2271                    ## User (checkpoint)
2272
2273                    Lorem
2274
2275                    ## Assistant
2276
2277                    LOREM
2278
2279                "}
2280            );
2281        });
2282        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2283    }
2284
2285    async fn run_until_first_tool_call(
2286        thread: &Entity<AcpThread>,
2287        cx: &mut TestAppContext,
2288    ) -> usize {
2289        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2290
2291        let subscription = cx.update(|cx| {
2292            cx.subscribe(thread, move |thread, _, cx| {
2293                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2294                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2295                        return tx.try_send(ix).unwrap();
2296                    }
2297                }
2298            })
2299        });
2300
2301        select! {
2302            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2303                panic!("Timeout waiting for tool call")
2304            }
2305            ix = rx.next().fuse() => {
2306                drop(subscription);
2307                ix.unwrap()
2308            }
2309        }
2310    }
2311
2312    #[derive(Clone, Default)]
2313    struct FakeAgentConnection {
2314        auth_methods: Vec<acp::AuthMethod>,
2315        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2316        on_user_message: Option<
2317            Rc<
2318                dyn Fn(
2319                        acp::PromptRequest,
2320                        WeakEntity<AcpThread>,
2321                        AsyncApp,
2322                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2323                    + 'static,
2324            >,
2325        >,
2326    }
2327
2328    impl FakeAgentConnection {
2329        fn new() -> Self {
2330            Self {
2331                auth_methods: Vec::new(),
2332                on_user_message: None,
2333                sessions: Arc::default(),
2334            }
2335        }
2336
2337        #[expect(unused)]
2338        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2339            self.auth_methods = auth_methods;
2340            self
2341        }
2342
2343        fn on_user_message(
2344            mut self,
2345            handler: impl Fn(
2346                acp::PromptRequest,
2347                WeakEntity<AcpThread>,
2348                AsyncApp,
2349            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2350            + 'static,
2351        ) -> Self {
2352            self.on_user_message.replace(Rc::new(handler));
2353            self
2354        }
2355    }
2356
2357    impl AgentConnection for FakeAgentConnection {
2358        fn auth_methods(&self) -> &[acp::AuthMethod] {
2359            &self.auth_methods
2360        }
2361
2362        fn new_thread(
2363            self: Rc<Self>,
2364            project: Entity<Project>,
2365            _cwd: &Path,
2366            cx: &mut App,
2367        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2368            let session_id = acp::SessionId(
2369                rand::thread_rng()
2370                    .sample_iter(&rand::distributions::Alphanumeric)
2371                    .take(7)
2372                    .map(char::from)
2373                    .collect::<String>()
2374                    .into(),
2375            );
2376            let action_log = cx.new(|_| ActionLog::new(project.clone()));
2377            let thread = cx.new(|_cx| {
2378                AcpThread::new(
2379                    "Test",
2380                    self.clone(),
2381                    project,
2382                    action_log,
2383                    session_id.clone(),
2384                )
2385            });
2386            self.sessions.lock().insert(session_id, thread.downgrade());
2387            Task::ready(Ok(thread))
2388        }
2389
2390        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2391            if self.auth_methods().iter().any(|m| m.id == method) {
2392                Task::ready(Ok(()))
2393            } else {
2394                Task::ready(Err(anyhow!("Invalid Auth Method")))
2395            }
2396        }
2397
2398        fn prompt(
2399            &self,
2400            _id: Option<UserMessageId>,
2401            params: acp::PromptRequest,
2402            cx: &mut App,
2403        ) -> Task<gpui::Result<acp::PromptResponse>> {
2404            let sessions = self.sessions.lock();
2405            let thread = sessions.get(&params.session_id).unwrap();
2406            if let Some(handler) = &self.on_user_message {
2407                let handler = handler.clone();
2408                let thread = thread.clone();
2409                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2410            } else {
2411                Task::ready(Ok(acp::PromptResponse {
2412                    stop_reason: acp::StopReason::EndTurn,
2413                }))
2414            }
2415        }
2416
2417        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2418            let sessions = self.sessions.lock();
2419            let thread = sessions.get(session_id).unwrap().clone();
2420
2421            cx.spawn(async move |cx| {
2422                thread
2423                    .update(cx, |thread, cx| thread.cancel(cx))
2424                    .unwrap()
2425                    .await
2426            })
2427            .detach();
2428        }
2429
2430        fn session_editor(
2431            &self,
2432            session_id: &acp::SessionId,
2433            _cx: &mut App,
2434        ) -> Option<Rc<dyn AgentSessionEditor>> {
2435            Some(Rc::new(FakeAgentSessionEditor {
2436                _session_id: session_id.clone(),
2437            }))
2438        }
2439
2440        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2441            self
2442        }
2443    }
2444
2445    struct FakeAgentSessionEditor {
2446        _session_id: acp::SessionId,
2447    }
2448
2449    impl AgentSessionEditor for FakeAgentSessionEditor {
2450        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2451            Task::ready(Ok(()))
2452        }
2453    }
2454}