acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9pub use terminal::*;
  10
  11use action_log::ActionLog;
  12use agent_client_protocol as acp;
  13use anyhow::{Context as _, Result, anyhow};
  14use editor::Bias;
  15use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  16use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  17use itertools::Itertools;
  18use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  19use markdown::Markdown;
  20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  21use std::collections::HashMap;
  22use std::error::Error;
  23use std::fmt::{Formatter, Write};
  24use std::ops::Range;
  25use std::process::ExitStatus;
  26use std::rc::Rc;
  27use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  28use ui::App;
  29use util::ResultExt;
  30
  31#[derive(Debug)]
  32pub struct UserMessage {
  33    pub id: Option<UserMessageId>,
  34    pub content: ContentBlock,
  35    pub chunks: Vec<acp::ContentBlock>,
  36    pub checkpoint: Option<Checkpoint>,
  37}
  38
  39#[derive(Debug)]
  40pub struct Checkpoint {
  41    git_checkpoint: GitStoreCheckpoint,
  42    pub show: bool,
  43}
  44
  45impl UserMessage {
  46    fn to_markdown(&self, cx: &App) -> String {
  47        let mut markdown = String::new();
  48        if self
  49            .checkpoint
  50            .as_ref()
  51            .map_or(false, |checkpoint| checkpoint.show)
  52        {
  53            writeln!(markdown, "## User (checkpoint)").unwrap();
  54        } else {
  55            writeln!(markdown, "## User").unwrap();
  56        }
  57        writeln!(markdown).unwrap();
  58        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  59        writeln!(markdown).unwrap();
  60        markdown
  61    }
  62}
  63
  64#[derive(Debug, PartialEq)]
  65pub struct AssistantMessage {
  66    pub chunks: Vec<AssistantMessageChunk>,
  67}
  68
  69impl AssistantMessage {
  70    pub fn to_markdown(&self, cx: &App) -> String {
  71        format!(
  72            "## Assistant\n\n{}\n\n",
  73            self.chunks
  74                .iter()
  75                .map(|chunk| chunk.to_markdown(cx))
  76                .join("\n\n")
  77        )
  78    }
  79}
  80
  81#[derive(Debug, PartialEq)]
  82pub enum AssistantMessageChunk {
  83    Message { block: ContentBlock },
  84    Thought { block: ContentBlock },
  85}
  86
  87impl AssistantMessageChunk {
  88    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  89        Self::Message {
  90            block: ContentBlock::new(chunk.into(), language_registry, cx),
  91        }
  92    }
  93
  94    fn to_markdown(&self, cx: &App) -> String {
  95        match self {
  96            Self::Message { block } => block.to_markdown(cx).to_string(),
  97            Self::Thought { block } => {
  98                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
  99            }
 100        }
 101    }
 102}
 103
 104#[derive(Debug)]
 105pub enum AgentThreadEntry {
 106    UserMessage(UserMessage),
 107    AssistantMessage(AssistantMessage),
 108    ToolCall(ToolCall),
 109}
 110
 111impl AgentThreadEntry {
 112    pub fn to_markdown(&self, cx: &App) -> String {
 113        match self {
 114            Self::UserMessage(message) => message.to_markdown(cx),
 115            Self::AssistantMessage(message) => message.to_markdown(cx),
 116            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 117        }
 118    }
 119
 120    pub fn user_message(&self) -> Option<&UserMessage> {
 121        if let AgentThreadEntry::UserMessage(message) = self {
 122            Some(message)
 123        } else {
 124            None
 125        }
 126    }
 127
 128    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 129        if let AgentThreadEntry::ToolCall(call) = self {
 130            itertools::Either::Left(call.diffs())
 131        } else {
 132            itertools::Either::Right(std::iter::empty())
 133        }
 134    }
 135
 136    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 137        if let AgentThreadEntry::ToolCall(call) = self {
 138            itertools::Either::Left(call.terminals())
 139        } else {
 140            itertools::Either::Right(std::iter::empty())
 141        }
 142    }
 143
 144    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 145        if let AgentThreadEntry::ToolCall(ToolCall {
 146            locations,
 147            resolved_locations,
 148            ..
 149        }) = self
 150        {
 151            Some((
 152                locations.get(ix)?.clone(),
 153                resolved_locations.get(ix)?.clone()?,
 154            ))
 155        } else {
 156            None
 157        }
 158    }
 159}
 160
 161#[derive(Debug)]
 162pub struct ToolCall {
 163    pub id: acp::ToolCallId,
 164    pub label: Entity<Markdown>,
 165    pub kind: acp::ToolKind,
 166    pub content: Vec<ToolCallContent>,
 167    pub status: ToolCallStatus,
 168    pub locations: Vec<acp::ToolCallLocation>,
 169    pub resolved_locations: Vec<Option<AgentLocation>>,
 170    pub raw_input: Option<serde_json::Value>,
 171    pub raw_output: Option<serde_json::Value>,
 172}
 173
 174impl ToolCall {
 175    fn from_acp(
 176        tool_call: acp::ToolCall,
 177        status: ToolCallStatus,
 178        language_registry: Arc<LanguageRegistry>,
 179        cx: &mut App,
 180    ) -> Self {
 181        Self {
 182            id: tool_call.id,
 183            label: cx.new(|cx| {
 184                Markdown::new(
 185                    tool_call.title.into(),
 186                    Some(language_registry.clone()),
 187                    None,
 188                    cx,
 189                )
 190            }),
 191            kind: tool_call.kind,
 192            content: tool_call
 193                .content
 194                .into_iter()
 195                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 196                .collect(),
 197            locations: tool_call.locations,
 198            resolved_locations: Vec::default(),
 199            status,
 200            raw_input: tool_call.raw_input,
 201            raw_output: tool_call.raw_output,
 202        }
 203    }
 204
 205    fn update_fields(
 206        &mut self,
 207        fields: acp::ToolCallUpdateFields,
 208        language_registry: Arc<LanguageRegistry>,
 209        cx: &mut App,
 210    ) {
 211        let acp::ToolCallUpdateFields {
 212            kind,
 213            status,
 214            title,
 215            content,
 216            locations,
 217            raw_input,
 218            raw_output,
 219        } = fields;
 220
 221        if let Some(kind) = kind {
 222            self.kind = kind;
 223        }
 224
 225        if let Some(status) = status {
 226            self.status = status.into();
 227        }
 228
 229        if let Some(title) = title {
 230            self.label.update(cx, |label, cx| {
 231                label.replace(title, cx);
 232            });
 233        }
 234
 235        if let Some(content) = content {
 236            self.content = content
 237                .into_iter()
 238                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 239                .collect();
 240        }
 241
 242        if let Some(locations) = locations {
 243            self.locations = locations;
 244        }
 245
 246        if let Some(raw_input) = raw_input {
 247            self.raw_input = Some(raw_input);
 248        }
 249
 250        if let Some(raw_output) = raw_output {
 251            if self.content.is_empty() {
 252                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 253                {
 254                    self.content
 255                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 256                            markdown,
 257                        }));
 258                }
 259            }
 260            self.raw_output = Some(raw_output);
 261        }
 262    }
 263
 264    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 265        self.content.iter().filter_map(|content| match content {
 266            ToolCallContent::Diff(diff) => Some(diff),
 267            ToolCallContent::ContentBlock(_) => None,
 268            ToolCallContent::Terminal(_) => None,
 269        })
 270    }
 271
 272    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 273        self.content.iter().filter_map(|content| match content {
 274            ToolCallContent::Terminal(terminal) => Some(terminal),
 275            ToolCallContent::ContentBlock(_) => None,
 276            ToolCallContent::Diff(_) => None,
 277        })
 278    }
 279
 280    fn to_markdown(&self, cx: &App) -> String {
 281        let mut markdown = format!(
 282            "**Tool Call: {}**\nStatus: {}\n\n",
 283            self.label.read(cx).source(),
 284            self.status
 285        );
 286        for content in &self.content {
 287            markdown.push_str(content.to_markdown(cx).as_str());
 288            markdown.push_str("\n\n");
 289        }
 290        markdown
 291    }
 292
 293    async fn resolve_location(
 294        location: acp::ToolCallLocation,
 295        project: WeakEntity<Project>,
 296        cx: &mut AsyncApp,
 297    ) -> Option<AgentLocation> {
 298        let buffer = project
 299            .update(cx, |project, cx| {
 300                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 301                    Some(project.open_buffer(path, cx))
 302                } else {
 303                    None
 304                }
 305            })
 306            .ok()??;
 307        let buffer = buffer.await.log_err()?;
 308        let position = buffer
 309            .update(cx, |buffer, _| {
 310                if let Some(row) = location.line {
 311                    let snapshot = buffer.snapshot();
 312                    let column = snapshot.indent_size_for_line(row).len;
 313                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 314                    snapshot.anchor_before(point)
 315                } else {
 316                    Anchor::MIN
 317                }
 318            })
 319            .ok()?;
 320
 321        Some(AgentLocation {
 322            buffer: buffer.downgrade(),
 323            position,
 324        })
 325    }
 326
 327    fn resolve_locations(
 328        &self,
 329        project: Entity<Project>,
 330        cx: &mut App,
 331    ) -> Task<Vec<Option<AgentLocation>>> {
 332        let locations = self.locations.clone();
 333        project.update(cx, |_, cx| {
 334            cx.spawn(async move |project, cx| {
 335                let mut new_locations = Vec::new();
 336                for location in locations {
 337                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 338                }
 339                new_locations
 340            })
 341        })
 342    }
 343}
 344
 345#[derive(Debug)]
 346pub enum ToolCallStatus {
 347    /// The tool call hasn't started running yet, but we start showing it to
 348    /// the user.
 349    Pending,
 350    /// The tool call is waiting for confirmation from the user.
 351    WaitingForConfirmation {
 352        options: Vec<acp::PermissionOption>,
 353        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 354    },
 355    /// The tool call is currently running.
 356    InProgress,
 357    /// The tool call completed successfully.
 358    Completed,
 359    /// The tool call failed.
 360    Failed,
 361    /// The user rejected the tool call.
 362    Rejected,
 363    /// The user canceled generation so the tool call was canceled.
 364    Canceled,
 365}
 366
 367impl From<acp::ToolCallStatus> for ToolCallStatus {
 368    fn from(status: acp::ToolCallStatus) -> Self {
 369        match status {
 370            acp::ToolCallStatus::Pending => Self::Pending,
 371            acp::ToolCallStatus::InProgress => Self::InProgress,
 372            acp::ToolCallStatus::Completed => Self::Completed,
 373            acp::ToolCallStatus::Failed => Self::Failed,
 374        }
 375    }
 376}
 377
 378impl Display for ToolCallStatus {
 379    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 380        write!(
 381            f,
 382            "{}",
 383            match self {
 384                ToolCallStatus::Pending => "Pending",
 385                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 386                ToolCallStatus::InProgress => "In Progress",
 387                ToolCallStatus::Completed => "Completed",
 388                ToolCallStatus::Failed => "Failed",
 389                ToolCallStatus::Rejected => "Rejected",
 390                ToolCallStatus::Canceled => "Canceled",
 391            }
 392        )
 393    }
 394}
 395
 396#[derive(Debug, PartialEq, Clone)]
 397pub enum ContentBlock {
 398    Empty,
 399    Markdown { markdown: Entity<Markdown> },
 400    ResourceLink { resource_link: acp::ResourceLink },
 401}
 402
 403impl ContentBlock {
 404    pub fn new(
 405        block: acp::ContentBlock,
 406        language_registry: &Arc<LanguageRegistry>,
 407        cx: &mut App,
 408    ) -> Self {
 409        let mut this = Self::Empty;
 410        this.append(block, language_registry, cx);
 411        this
 412    }
 413
 414    pub fn new_combined(
 415        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 416        language_registry: Arc<LanguageRegistry>,
 417        cx: &mut App,
 418    ) -> Self {
 419        let mut this = Self::Empty;
 420        for block in blocks {
 421            this.append(block, &language_registry, cx);
 422        }
 423        this
 424    }
 425
 426    pub fn append(
 427        &mut self,
 428        block: acp::ContentBlock,
 429        language_registry: &Arc<LanguageRegistry>,
 430        cx: &mut App,
 431    ) {
 432        if matches!(self, ContentBlock::Empty) {
 433            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 434                *self = ContentBlock::ResourceLink { resource_link };
 435                return;
 436            }
 437        }
 438
 439        let new_content = self.block_string_contents(block);
 440
 441        match self {
 442            ContentBlock::Empty => {
 443                *self = Self::create_markdown_block(new_content, language_registry, cx);
 444            }
 445            ContentBlock::Markdown { markdown } => {
 446                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 447            }
 448            ContentBlock::ResourceLink { resource_link } => {
 449                let existing_content = Self::resource_link_md(&resource_link.uri);
 450                let combined = format!("{}\n{}", existing_content, new_content);
 451
 452                *self = Self::create_markdown_block(combined, language_registry, cx);
 453            }
 454        }
 455    }
 456
 457    fn create_markdown_block(
 458        content: String,
 459        language_registry: &Arc<LanguageRegistry>,
 460        cx: &mut App,
 461    ) -> ContentBlock {
 462        ContentBlock::Markdown {
 463            markdown: cx
 464                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 465        }
 466    }
 467
 468    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 469        match block {
 470            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 471            acp::ContentBlock::ResourceLink(resource_link) => {
 472                Self::resource_link_md(&resource_link.uri)
 473            }
 474            acp::ContentBlock::Resource(acp::EmbeddedResource {
 475                resource:
 476                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 477                        uri,
 478                        ..
 479                    }),
 480                ..
 481            }) => Self::resource_link_md(&uri),
 482            acp::ContentBlock::Image(image) => Self::image_md(&image),
 483            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 484        }
 485    }
 486
 487    fn resource_link_md(uri: &str) -> String {
 488        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 489            uri.as_link().to_string()
 490        } else {
 491            uri.to_string()
 492        }
 493    }
 494
 495    fn image_md(_image: &acp::ImageContent) -> String {
 496        "`Image`".into()
 497    }
 498
 499    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 500        match self {
 501            ContentBlock::Empty => "",
 502            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 503            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 504        }
 505    }
 506
 507    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 508        match self {
 509            ContentBlock::Empty => None,
 510            ContentBlock::Markdown { markdown } => Some(markdown),
 511            ContentBlock::ResourceLink { .. } => None,
 512        }
 513    }
 514
 515    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 516        match self {
 517            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 518            _ => None,
 519        }
 520    }
 521}
 522
 523#[derive(Debug)]
 524pub enum ToolCallContent {
 525    ContentBlock(ContentBlock),
 526    Diff(Entity<Diff>),
 527    Terminal(Entity<Terminal>),
 528}
 529
 530impl ToolCallContent {
 531    pub fn from_acp(
 532        content: acp::ToolCallContent,
 533        language_registry: Arc<LanguageRegistry>,
 534        cx: &mut App,
 535    ) -> Self {
 536        match content {
 537            acp::ToolCallContent::Content { content } => {
 538                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 539            }
 540            acp::ToolCallContent::Diff { diff } => {
 541                Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
 542            }
 543        }
 544    }
 545
 546    pub fn to_markdown(&self, cx: &App) -> String {
 547        match self {
 548            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 549            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 550            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 551        }
 552    }
 553}
 554
 555#[derive(Debug, PartialEq)]
 556pub enum ToolCallUpdate {
 557    UpdateFields(acp::ToolCallUpdate),
 558    UpdateDiff(ToolCallUpdateDiff),
 559    UpdateTerminal(ToolCallUpdateTerminal),
 560}
 561
 562impl ToolCallUpdate {
 563    fn id(&self) -> &acp::ToolCallId {
 564        match self {
 565            Self::UpdateFields(update) => &update.id,
 566            Self::UpdateDiff(diff) => &diff.id,
 567            Self::UpdateTerminal(terminal) => &terminal.id,
 568        }
 569    }
 570}
 571
 572impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 573    fn from(update: acp::ToolCallUpdate) -> Self {
 574        Self::UpdateFields(update)
 575    }
 576}
 577
 578impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 579    fn from(diff: ToolCallUpdateDiff) -> Self {
 580        Self::UpdateDiff(diff)
 581    }
 582}
 583
 584#[derive(Debug, PartialEq)]
 585pub struct ToolCallUpdateDiff {
 586    pub id: acp::ToolCallId,
 587    pub diff: Entity<Diff>,
 588}
 589
 590impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 591    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 592        Self::UpdateTerminal(terminal)
 593    }
 594}
 595
 596#[derive(Debug, PartialEq)]
 597pub struct ToolCallUpdateTerminal {
 598    pub id: acp::ToolCallId,
 599    pub terminal: Entity<Terminal>,
 600}
 601
 602#[derive(Debug, Default)]
 603pub struct Plan {
 604    pub entries: Vec<PlanEntry>,
 605}
 606
 607#[derive(Debug)]
 608pub struct PlanStats<'a> {
 609    pub in_progress_entry: Option<&'a PlanEntry>,
 610    pub pending: u32,
 611    pub completed: u32,
 612}
 613
 614impl Plan {
 615    pub fn is_empty(&self) -> bool {
 616        self.entries.is_empty()
 617    }
 618
 619    pub fn stats(&self) -> PlanStats<'_> {
 620        let mut stats = PlanStats {
 621            in_progress_entry: None,
 622            pending: 0,
 623            completed: 0,
 624        };
 625
 626        for entry in &self.entries {
 627            match &entry.status {
 628                acp::PlanEntryStatus::Pending => {
 629                    stats.pending += 1;
 630                }
 631                acp::PlanEntryStatus::InProgress => {
 632                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 633                }
 634                acp::PlanEntryStatus::Completed => {
 635                    stats.completed += 1;
 636                }
 637            }
 638        }
 639
 640        stats
 641    }
 642}
 643
 644#[derive(Debug)]
 645pub struct PlanEntry {
 646    pub content: Entity<Markdown>,
 647    pub priority: acp::PlanEntryPriority,
 648    pub status: acp::PlanEntryStatus,
 649}
 650
 651impl PlanEntry {
 652    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 653        Self {
 654            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 655            priority: entry.priority,
 656            status: entry.status,
 657        }
 658    }
 659}
 660
 661pub struct AcpThread {
 662    title: SharedString,
 663    entries: Vec<AgentThreadEntry>,
 664    plan: Plan,
 665    project: Entity<Project>,
 666    action_log: Entity<ActionLog>,
 667    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 668    send_task: Option<Task<()>>,
 669    connection: Rc<dyn AgentConnection>,
 670    session_id: acp::SessionId,
 671}
 672
 673#[derive(Debug)]
 674pub enum AcpThreadEvent {
 675    NewEntry,
 676    EntryUpdated(usize),
 677    EntriesRemoved(Range<usize>),
 678    ToolAuthorizationRequired,
 679    Stopped,
 680    Error,
 681    ServerExited(ExitStatus),
 682}
 683
 684impl EventEmitter<AcpThreadEvent> for AcpThread {}
 685
 686#[derive(PartialEq, Eq)]
 687pub enum ThreadStatus {
 688    Idle,
 689    WaitingForToolConfirmation,
 690    Generating,
 691}
 692
 693#[derive(Debug, Clone)]
 694pub enum LoadError {
 695    Unsupported {
 696        error_message: SharedString,
 697        upgrade_message: SharedString,
 698        upgrade_command: String,
 699    },
 700    Exited(i32),
 701    Other(SharedString),
 702}
 703
 704impl Display for LoadError {
 705    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 706        match self {
 707            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 708            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 709            LoadError::Other(msg) => write!(f, "{}", msg),
 710        }
 711    }
 712}
 713
 714impl Error for LoadError {}
 715
 716impl AcpThread {
 717    pub fn new(
 718        title: impl Into<SharedString>,
 719        connection: Rc<dyn AgentConnection>,
 720        project: Entity<Project>,
 721        session_id: acp::SessionId,
 722        cx: &mut Context<Self>,
 723    ) -> Self {
 724        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 725
 726        Self {
 727            action_log,
 728            shared_buffers: Default::default(),
 729            entries: Default::default(),
 730            plan: Default::default(),
 731            title: title.into(),
 732            project,
 733            send_task: None,
 734            connection,
 735            session_id,
 736        }
 737    }
 738
 739    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 740        &self.connection
 741    }
 742
 743    pub fn action_log(&self) -> &Entity<ActionLog> {
 744        &self.action_log
 745    }
 746
 747    pub fn project(&self) -> &Entity<Project> {
 748        &self.project
 749    }
 750
 751    pub fn title(&self) -> SharedString {
 752        self.title.clone()
 753    }
 754
 755    pub fn entries(&self) -> &[AgentThreadEntry] {
 756        &self.entries
 757    }
 758
 759    pub fn session_id(&self) -> &acp::SessionId {
 760        &self.session_id
 761    }
 762
 763    pub fn status(&self) -> ThreadStatus {
 764        if self.send_task.is_some() {
 765            if self.waiting_for_tool_confirmation() {
 766                ThreadStatus::WaitingForToolConfirmation
 767            } else {
 768                ThreadStatus::Generating
 769            }
 770        } else {
 771            ThreadStatus::Idle
 772        }
 773    }
 774
 775    pub fn has_pending_edit_tool_calls(&self) -> bool {
 776        for entry in self.entries.iter().rev() {
 777            match entry {
 778                AgentThreadEntry::UserMessage(_) => return false,
 779                AgentThreadEntry::ToolCall(
 780                    call @ ToolCall {
 781                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
 782                        ..
 783                    },
 784                ) if call.diffs().next().is_some() => {
 785                    return true;
 786                }
 787                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 788            }
 789        }
 790
 791        false
 792    }
 793
 794    pub fn used_tools_since_last_user_message(&self) -> bool {
 795        for entry in self.entries.iter().rev() {
 796            match entry {
 797                AgentThreadEntry::UserMessage(..) => return false,
 798                AgentThreadEntry::AssistantMessage(..) => continue,
 799                AgentThreadEntry::ToolCall(..) => return true,
 800            }
 801        }
 802
 803        false
 804    }
 805
 806    pub fn handle_session_update(
 807        &mut self,
 808        update: acp::SessionUpdate,
 809        cx: &mut Context<Self>,
 810    ) -> Result<(), acp::Error> {
 811        match update {
 812            acp::SessionUpdate::UserMessageChunk { content } => {
 813                self.push_user_content_block(None, content, cx);
 814            }
 815            acp::SessionUpdate::AgentMessageChunk { content } => {
 816                self.push_assistant_content_block(content, false, cx);
 817            }
 818            acp::SessionUpdate::AgentThoughtChunk { content } => {
 819                self.push_assistant_content_block(content, true, cx);
 820            }
 821            acp::SessionUpdate::ToolCall(tool_call) => {
 822                self.upsert_tool_call(tool_call, cx)?;
 823            }
 824            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 825                self.update_tool_call(tool_call_update, cx)?;
 826            }
 827            acp::SessionUpdate::Plan(plan) => {
 828                self.update_plan(plan, cx);
 829            }
 830        }
 831        Ok(())
 832    }
 833
 834    pub fn push_user_content_block(
 835        &mut self,
 836        message_id: Option<UserMessageId>,
 837        chunk: acp::ContentBlock,
 838        cx: &mut Context<Self>,
 839    ) {
 840        let language_registry = self.project.read(cx).languages().clone();
 841        let entries_len = self.entries.len();
 842
 843        if let Some(last_entry) = self.entries.last_mut()
 844            && let AgentThreadEntry::UserMessage(UserMessage {
 845                id,
 846                content,
 847                chunks,
 848                ..
 849            }) = last_entry
 850        {
 851            *id = message_id.or(id.take());
 852            content.append(chunk.clone(), &language_registry, cx);
 853            chunks.push(chunk);
 854            let idx = entries_len - 1;
 855            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 856        } else {
 857            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 858            self.push_entry(
 859                AgentThreadEntry::UserMessage(UserMessage {
 860                    id: message_id,
 861                    content,
 862                    chunks: vec![chunk],
 863                    checkpoint: None,
 864                }),
 865                cx,
 866            );
 867        }
 868    }
 869
 870    pub fn push_assistant_content_block(
 871        &mut self,
 872        chunk: acp::ContentBlock,
 873        is_thought: bool,
 874        cx: &mut Context<Self>,
 875    ) {
 876        let language_registry = self.project.read(cx).languages().clone();
 877        let entries_len = self.entries.len();
 878        if let Some(last_entry) = self.entries.last_mut()
 879            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 880        {
 881            let idx = entries_len - 1;
 882            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 883            match (chunks.last_mut(), is_thought) {
 884                (Some(AssistantMessageChunk::Message { block }), false)
 885                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 886                    block.append(chunk, &language_registry, cx)
 887                }
 888                _ => {
 889                    let block = ContentBlock::new(chunk, &language_registry, cx);
 890                    if is_thought {
 891                        chunks.push(AssistantMessageChunk::Thought { block })
 892                    } else {
 893                        chunks.push(AssistantMessageChunk::Message { block })
 894                    }
 895                }
 896            }
 897        } else {
 898            let block = ContentBlock::new(chunk, &language_registry, cx);
 899            let chunk = if is_thought {
 900                AssistantMessageChunk::Thought { block }
 901            } else {
 902                AssistantMessageChunk::Message { block }
 903            };
 904
 905            self.push_entry(
 906                AgentThreadEntry::AssistantMessage(AssistantMessage {
 907                    chunks: vec![chunk],
 908                }),
 909                cx,
 910            );
 911        }
 912    }
 913
 914    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 915        self.entries.push(entry);
 916        cx.emit(AcpThreadEvent::NewEntry);
 917    }
 918
 919    pub fn update_tool_call(
 920        &mut self,
 921        update: impl Into<ToolCallUpdate>,
 922        cx: &mut Context<Self>,
 923    ) -> Result<()> {
 924        let update = update.into();
 925        let languages = self.project.read(cx).languages().clone();
 926
 927        let (ix, current_call) = self
 928            .tool_call_mut(update.id())
 929            .context("Tool call not found")?;
 930        match update {
 931            ToolCallUpdate::UpdateFields(update) => {
 932                let location_updated = update.fields.locations.is_some();
 933                current_call.update_fields(update.fields, languages, cx);
 934                if location_updated {
 935                    self.resolve_locations(update.id.clone(), cx);
 936                }
 937            }
 938            ToolCallUpdate::UpdateDiff(update) => {
 939                current_call.content.clear();
 940                current_call
 941                    .content
 942                    .push(ToolCallContent::Diff(update.diff));
 943            }
 944            ToolCallUpdate::UpdateTerminal(update) => {
 945                current_call.content.clear();
 946                current_call
 947                    .content
 948                    .push(ToolCallContent::Terminal(update.terminal));
 949            }
 950        }
 951
 952        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 953
 954        Ok(())
 955    }
 956
 957    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 958    pub fn upsert_tool_call(
 959        &mut self,
 960        tool_call: acp::ToolCall,
 961        cx: &mut Context<Self>,
 962    ) -> Result<(), acp::Error> {
 963        let status = tool_call.status.into();
 964        self.upsert_tool_call_inner(tool_call.into(), status, cx)
 965    }
 966
 967    /// Fails if id does not match an existing entry.
 968    pub fn upsert_tool_call_inner(
 969        &mut self,
 970        tool_call_update: acp::ToolCallUpdate,
 971        status: ToolCallStatus,
 972        cx: &mut Context<Self>,
 973    ) -> Result<(), acp::Error> {
 974        let language_registry = self.project.read(cx).languages().clone();
 975        let id = tool_call_update.id.clone();
 976
 977        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
 978            current_call.update_fields(tool_call_update.fields, language_registry, cx);
 979            current_call.status = status;
 980
 981            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 982        } else {
 983            let call =
 984                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
 985            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 986        };
 987
 988        self.resolve_locations(id, cx);
 989        Ok(())
 990    }
 991
 992    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 993        // The tool call we are looking for is typically the last one, or very close to the end.
 994        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
 995        self.entries
 996            .iter_mut()
 997            .enumerate()
 998            .rev()
 999            .find_map(|(index, tool_call)| {
1000                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1001                    && &tool_call.id == id
1002                {
1003                    Some((index, tool_call))
1004                } else {
1005                    None
1006                }
1007            })
1008    }
1009
1010    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1011        let project = self.project.clone();
1012        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1013            return;
1014        };
1015        let task = tool_call.resolve_locations(project, cx);
1016        cx.spawn(async move |this, cx| {
1017            let resolved_locations = task.await;
1018            this.update(cx, |this, cx| {
1019                let project = this.project.clone();
1020                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1021                    return;
1022                };
1023                if let Some(Some(location)) = resolved_locations.last() {
1024                    project.update(cx, |project, cx| {
1025                        if let Some(agent_location) = project.agent_location() {
1026                            let should_ignore = agent_location.buffer == location.buffer
1027                                && location
1028                                    .buffer
1029                                    .update(cx, |buffer, _| {
1030                                        let snapshot = buffer.snapshot();
1031                                        let old_position =
1032                                            agent_location.position.to_point(&snapshot);
1033                                        let new_position = location.position.to_point(&snapshot);
1034                                        // ignore this so that when we get updates from the edit tool
1035                                        // the position doesn't reset to the startof line
1036                                        old_position.row == new_position.row
1037                                            && old_position.column > new_position.column
1038                                    })
1039                                    .ok()
1040                                    .unwrap_or_default();
1041                            if !should_ignore {
1042                                project.set_agent_location(Some(location.clone()), cx);
1043                            }
1044                        }
1045                    });
1046                }
1047                if tool_call.resolved_locations != resolved_locations {
1048                    tool_call.resolved_locations = resolved_locations;
1049                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1050                }
1051            })
1052        })
1053        .detach();
1054    }
1055
1056    pub fn request_tool_call_authorization(
1057        &mut self,
1058        tool_call: acp::ToolCallUpdate,
1059        options: Vec<acp::PermissionOption>,
1060        cx: &mut Context<Self>,
1061    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1062        let (tx, rx) = oneshot::channel();
1063
1064        let status = ToolCallStatus::WaitingForConfirmation {
1065            options,
1066            respond_tx: tx,
1067        };
1068
1069        self.upsert_tool_call_inner(tool_call, status, cx)?;
1070        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1071        Ok(rx)
1072    }
1073
1074    pub fn authorize_tool_call(
1075        &mut self,
1076        id: acp::ToolCallId,
1077        option_id: acp::PermissionOptionId,
1078        option_kind: acp::PermissionOptionKind,
1079        cx: &mut Context<Self>,
1080    ) {
1081        let Some((ix, call)) = self.tool_call_mut(&id) else {
1082            return;
1083        };
1084
1085        let new_status = match option_kind {
1086            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1087                ToolCallStatus::Rejected
1088            }
1089            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1090                ToolCallStatus::InProgress
1091            }
1092        };
1093
1094        let curr_status = mem::replace(&mut call.status, new_status);
1095
1096        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1097            respond_tx.send(option_id).log_err();
1098        } else if cfg!(debug_assertions) {
1099            panic!("tried to authorize an already authorized tool call");
1100        }
1101
1102        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1103    }
1104
1105    /// Returns true if the last turn is awaiting tool authorization
1106    pub fn waiting_for_tool_confirmation(&self) -> bool {
1107        for entry in self.entries.iter().rev() {
1108            match &entry {
1109                AgentThreadEntry::ToolCall(call) => match call.status {
1110                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1111                    ToolCallStatus::Pending
1112                    | ToolCallStatus::InProgress
1113                    | ToolCallStatus::Completed
1114                    | ToolCallStatus::Failed
1115                    | ToolCallStatus::Rejected
1116                    | ToolCallStatus::Canceled => continue,
1117                },
1118                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1119                    // Reached the beginning of the turn
1120                    return false;
1121                }
1122            }
1123        }
1124        false
1125    }
1126
1127    pub fn plan(&self) -> &Plan {
1128        &self.plan
1129    }
1130
1131    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1132        let new_entries_len = request.entries.len();
1133        let mut new_entries = request.entries.into_iter();
1134
1135        // Reuse existing markdown to prevent flickering
1136        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1137            let PlanEntry {
1138                content,
1139                priority,
1140                status,
1141            } = old;
1142            content.update(cx, |old, cx| {
1143                old.replace(new.content, cx);
1144            });
1145            *priority = new.priority;
1146            *status = new.status;
1147        }
1148        for new in new_entries {
1149            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1150        }
1151        self.plan.entries.truncate(new_entries_len);
1152
1153        cx.notify();
1154    }
1155
1156    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1157        self.plan
1158            .entries
1159            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1160        cx.notify();
1161    }
1162
1163    #[cfg(any(test, feature = "test-support"))]
1164    pub fn send_raw(
1165        &mut self,
1166        message: &str,
1167        cx: &mut Context<Self>,
1168    ) -> BoxFuture<'static, Result<()>> {
1169        self.send(
1170            vec![acp::ContentBlock::Text(acp::TextContent {
1171                text: message.to_string(),
1172                annotations: None,
1173            })],
1174            cx,
1175        )
1176    }
1177
1178    pub fn send(
1179        &mut self,
1180        message: Vec<acp::ContentBlock>,
1181        cx: &mut Context<Self>,
1182    ) -> BoxFuture<'static, Result<()>> {
1183        let block = ContentBlock::new_combined(
1184            message.clone(),
1185            self.project.read(cx).languages().clone(),
1186            cx,
1187        );
1188        let request = acp::PromptRequest {
1189            prompt: message.clone(),
1190            session_id: self.session_id.clone(),
1191        };
1192        let git_store = self.project.read(cx).git_store().clone();
1193
1194        let message_id = if self
1195            .connection
1196            .session_editor(&self.session_id, cx)
1197            .is_some()
1198        {
1199            Some(UserMessageId::new())
1200        } else {
1201            None
1202        };
1203
1204        self.run_turn(cx, async move |this, cx| {
1205            this.update(cx, |this, cx| {
1206                this.push_entry(
1207                    AgentThreadEntry::UserMessage(UserMessage {
1208                        id: message_id.clone(),
1209                        content: block,
1210                        chunks: message,
1211                        checkpoint: None,
1212                    }),
1213                    cx,
1214                );
1215            })
1216            .ok();
1217
1218            let old_checkpoint = git_store
1219                .update(cx, |git, cx| git.checkpoint(cx))?
1220                .await
1221                .context("failed to get old checkpoint")
1222                .log_err();
1223            this.update(cx, |this, cx| {
1224                if let Some((_ix, message)) = this.last_user_message() {
1225                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1226                        git_checkpoint,
1227                        show: false,
1228                    });
1229                }
1230                this.connection.prompt(message_id, request, cx)
1231            })?
1232            .await
1233        })
1234    }
1235
1236    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1237        self.run_turn(cx, async move |this, cx| {
1238            this.update(cx, |this, cx| {
1239                this.connection
1240                    .resume(&this.session_id, cx)
1241                    .map(|resume| resume.run(cx))
1242            })?
1243            .context("resuming a session is not supported")?
1244            .await
1245        })
1246    }
1247
1248    fn run_turn(
1249        &mut self,
1250        cx: &mut Context<Self>,
1251        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1252    ) -> BoxFuture<'static, Result<()>> {
1253        self.clear_completed_plan_entries(cx);
1254
1255        let (tx, rx) = oneshot::channel();
1256        let cancel_task = self.cancel(cx);
1257
1258        self.send_task = Some(cx.spawn(async move |this, cx| {
1259            cancel_task.await;
1260            tx.send(f(this, cx).await).ok();
1261        }));
1262
1263        cx.spawn(async move |this, cx| {
1264            let response = rx.await;
1265
1266            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1267                .await?;
1268
1269            this.update(cx, |this, cx| {
1270                match response {
1271                    Ok(Err(e)) => {
1272                        this.send_task.take();
1273                        cx.emit(AcpThreadEvent::Error);
1274                        Err(e)
1275                    }
1276                    result => {
1277                        let canceled = matches!(
1278                            result,
1279                            Ok(Ok(acp::PromptResponse {
1280                                stop_reason: acp::StopReason::Canceled
1281                            }))
1282                        );
1283
1284                        // We only take the task if the current prompt wasn't canceled.
1285                        //
1286                        // This prompt may have been canceled because another one was sent
1287                        // while it was still generating. In these cases, dropping `send_task`
1288                        // would cause the next generation to be canceled.
1289                        if !canceled {
1290                            this.send_task.take();
1291                        }
1292
1293                        cx.emit(AcpThreadEvent::Stopped);
1294                        Ok(())
1295                    }
1296                }
1297            })?
1298        })
1299        .boxed()
1300    }
1301
1302    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1303        let Some(send_task) = self.send_task.take() else {
1304            return Task::ready(());
1305        };
1306
1307        for entry in self.entries.iter_mut() {
1308            if let AgentThreadEntry::ToolCall(call) = entry {
1309                let cancel = matches!(
1310                    call.status,
1311                    ToolCallStatus::Pending
1312                        | ToolCallStatus::WaitingForConfirmation { .. }
1313                        | ToolCallStatus::InProgress
1314                );
1315
1316                if cancel {
1317                    call.status = ToolCallStatus::Canceled;
1318                }
1319            }
1320        }
1321
1322        self.connection.cancel(&self.session_id, cx);
1323
1324        // Wait for the send task to complete
1325        cx.foreground_executor().spawn(send_task)
1326    }
1327
1328    /// Rewinds this thread to before the entry at `index`, removing it and all
1329    /// subsequent entries while reverting any changes made from that point.
1330    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1331        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1332            return Task::ready(Err(anyhow!("not supported")));
1333        };
1334        let Some(message) = self.user_message(&id) else {
1335            return Task::ready(Err(anyhow!("message not found")));
1336        };
1337
1338        let checkpoint = message
1339            .checkpoint
1340            .as_ref()
1341            .map(|c| c.git_checkpoint.clone());
1342
1343        let git_store = self.project.read(cx).git_store().clone();
1344        cx.spawn(async move |this, cx| {
1345            if let Some(checkpoint) = checkpoint {
1346                git_store
1347                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1348                    .await?;
1349            }
1350
1351            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1352                .await?;
1353            this.update(cx, |this, cx| {
1354                if let Some((ix, _)) = this.user_message_mut(&id) {
1355                    let range = ix..this.entries.len();
1356                    this.entries.truncate(ix);
1357                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1358                }
1359            })
1360        })
1361    }
1362
1363    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1364        let git_store = self.project.read(cx).git_store().clone();
1365
1366        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1367            if let Some(checkpoint) = message.checkpoint.as_ref() {
1368                checkpoint.git_checkpoint.clone()
1369            } else {
1370                return Task::ready(Ok(()));
1371            }
1372        } else {
1373            return Task::ready(Ok(()));
1374        };
1375
1376        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1377        cx.spawn(async move |this, cx| {
1378            let new_checkpoint = new_checkpoint
1379                .await
1380                .context("failed to get new checkpoint")
1381                .log_err();
1382            if let Some(new_checkpoint) = new_checkpoint {
1383                let equal = git_store
1384                    .update(cx, |git, cx| {
1385                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1386                    })?
1387                    .await
1388                    .unwrap_or(true);
1389                this.update(cx, |this, cx| {
1390                    let (ix, message) = this.last_user_message().context("no user message")?;
1391                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1392                    checkpoint.show = !equal;
1393                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1394                    anyhow::Ok(())
1395                })??;
1396            }
1397
1398            Ok(())
1399        })
1400    }
1401
1402    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1403        self.entries
1404            .iter_mut()
1405            .enumerate()
1406            .rev()
1407            .find_map(|(ix, entry)| {
1408                if let AgentThreadEntry::UserMessage(message) = entry {
1409                    Some((ix, message))
1410                } else {
1411                    None
1412                }
1413            })
1414    }
1415
1416    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1417        self.entries.iter().find_map(|entry| {
1418            if let AgentThreadEntry::UserMessage(message) = entry {
1419                if message.id.as_ref() == Some(&id) {
1420                    Some(message)
1421                } else {
1422                    None
1423                }
1424            } else {
1425                None
1426            }
1427        })
1428    }
1429
1430    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1431        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1432            if let AgentThreadEntry::UserMessage(message) = entry {
1433                if message.id.as_ref() == Some(&id) {
1434                    Some((ix, message))
1435                } else {
1436                    None
1437                }
1438            } else {
1439                None
1440            }
1441        })
1442    }
1443
1444    pub fn read_text_file(
1445        &self,
1446        path: PathBuf,
1447        line: Option<u32>,
1448        limit: Option<u32>,
1449        reuse_shared_snapshot: bool,
1450        cx: &mut Context<Self>,
1451    ) -> Task<Result<String>> {
1452        let project = self.project.clone();
1453        let action_log = self.action_log.clone();
1454        cx.spawn(async move |this, cx| {
1455            let load = project.update(cx, |project, cx| {
1456                let path = project
1457                    .project_path_for_absolute_path(&path, cx)
1458                    .context("invalid path")?;
1459                anyhow::Ok(project.open_buffer(path, cx))
1460            });
1461            let buffer = load??.await?;
1462
1463            let snapshot = if reuse_shared_snapshot {
1464                this.read_with(cx, |this, _| {
1465                    this.shared_buffers.get(&buffer.clone()).cloned()
1466                })
1467                .log_err()
1468                .flatten()
1469            } else {
1470                None
1471            };
1472
1473            let snapshot = if let Some(snapshot) = snapshot {
1474                snapshot
1475            } else {
1476                action_log.update(cx, |action_log, cx| {
1477                    action_log.buffer_read(buffer.clone(), cx);
1478                })?;
1479                project.update(cx, |project, cx| {
1480                    let position = buffer
1481                        .read(cx)
1482                        .snapshot()
1483                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1484                    project.set_agent_location(
1485                        Some(AgentLocation {
1486                            buffer: buffer.downgrade(),
1487                            position,
1488                        }),
1489                        cx,
1490                    );
1491                })?;
1492
1493                buffer.update(cx, |buffer, _| buffer.snapshot())?
1494            };
1495
1496            this.update(cx, |this, _| {
1497                let text = snapshot.text();
1498                this.shared_buffers.insert(buffer.clone(), snapshot);
1499                if line.is_none() && limit.is_none() {
1500                    return Ok(text);
1501                }
1502                let limit = limit.unwrap_or(u32::MAX) as usize;
1503                let Some(line) = line else {
1504                    return Ok(text.lines().take(limit).collect::<String>());
1505                };
1506
1507                let count = text.lines().count();
1508                if count < line as usize {
1509                    anyhow::bail!("There are only {} lines", count);
1510                }
1511                Ok(text
1512                    .lines()
1513                    .skip(line as usize + 1)
1514                    .take(limit)
1515                    .collect::<String>())
1516            })?
1517        })
1518    }
1519
1520    pub fn write_text_file(
1521        &self,
1522        path: PathBuf,
1523        content: String,
1524        cx: &mut Context<Self>,
1525    ) -> Task<Result<()>> {
1526        let project = self.project.clone();
1527        let action_log = self.action_log.clone();
1528        cx.spawn(async move |this, cx| {
1529            let load = project.update(cx, |project, cx| {
1530                let path = project
1531                    .project_path_for_absolute_path(&path, cx)
1532                    .context("invalid path")?;
1533                anyhow::Ok(project.open_buffer(path, cx))
1534            });
1535            let buffer = load??.await?;
1536            let snapshot = this.update(cx, |this, cx| {
1537                this.shared_buffers
1538                    .get(&buffer)
1539                    .cloned()
1540                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1541            })?;
1542            let edits = cx
1543                .background_executor()
1544                .spawn(async move {
1545                    let old_text = snapshot.text();
1546                    text_diff(old_text.as_str(), &content)
1547                        .into_iter()
1548                        .map(|(range, replacement)| {
1549                            (
1550                                snapshot.anchor_after(range.start)
1551                                    ..snapshot.anchor_before(range.end),
1552                                replacement,
1553                            )
1554                        })
1555                        .collect::<Vec<_>>()
1556                })
1557                .await;
1558            cx.update(|cx| {
1559                project.update(cx, |project, cx| {
1560                    project.set_agent_location(
1561                        Some(AgentLocation {
1562                            buffer: buffer.downgrade(),
1563                            position: edits
1564                                .last()
1565                                .map(|(range, _)| range.end)
1566                                .unwrap_or(Anchor::MIN),
1567                        }),
1568                        cx,
1569                    );
1570                });
1571
1572                action_log.update(cx, |action_log, cx| {
1573                    action_log.buffer_read(buffer.clone(), cx);
1574                });
1575                buffer.update(cx, |buffer, cx| {
1576                    buffer.edit(edits, None, cx);
1577                });
1578                action_log.update(cx, |action_log, cx| {
1579                    action_log.buffer_edited(buffer.clone(), cx);
1580                });
1581            })?;
1582            project
1583                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1584                .await
1585        })
1586    }
1587
1588    pub fn to_markdown(&self, cx: &App) -> String {
1589        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1590    }
1591
1592    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1593        cx.emit(AcpThreadEvent::ServerExited(status));
1594    }
1595}
1596
1597fn markdown_for_raw_output(
1598    raw_output: &serde_json::Value,
1599    language_registry: &Arc<LanguageRegistry>,
1600    cx: &mut App,
1601) -> Option<Entity<Markdown>> {
1602    match raw_output {
1603        serde_json::Value::Null => None,
1604        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1605            Markdown::new(
1606                value.to_string().into(),
1607                Some(language_registry.clone()),
1608                None,
1609                cx,
1610            )
1611        })),
1612        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1613            Markdown::new(
1614                value.to_string().into(),
1615                Some(language_registry.clone()),
1616                None,
1617                cx,
1618            )
1619        })),
1620        serde_json::Value::String(value) => Some(cx.new(|cx| {
1621            Markdown::new(
1622                value.clone().into(),
1623                Some(language_registry.clone()),
1624                None,
1625                cx,
1626            )
1627        })),
1628        value => Some(cx.new(|cx| {
1629            Markdown::new(
1630                format!("```json\n{}\n```", value).into(),
1631                Some(language_registry.clone()),
1632                None,
1633                cx,
1634            )
1635        })),
1636    }
1637}
1638
1639#[cfg(test)]
1640mod tests {
1641    use super::*;
1642    use anyhow::anyhow;
1643    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1644    use gpui::{AsyncApp, TestAppContext, WeakEntity};
1645    use indoc::indoc;
1646    use project::{FakeFs, Fs};
1647    use rand::Rng as _;
1648    use serde_json::json;
1649    use settings::SettingsStore;
1650    use smol::stream::StreamExt as _;
1651    use std::{
1652        any::Any,
1653        cell::RefCell,
1654        path::Path,
1655        rc::Rc,
1656        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1657        time::Duration,
1658    };
1659    use util::path;
1660
1661    fn init_test(cx: &mut TestAppContext) {
1662        env_logger::try_init().ok();
1663        cx.update(|cx| {
1664            let settings_store = SettingsStore::test(cx);
1665            cx.set_global(settings_store);
1666            Project::init_settings(cx);
1667            language::init(cx);
1668        });
1669    }
1670
1671    #[gpui::test]
1672    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1673        init_test(cx);
1674
1675        let fs = FakeFs::new(cx.executor());
1676        let project = Project::test(fs, [], cx).await;
1677        let connection = Rc::new(FakeAgentConnection::new());
1678        let thread = cx
1679            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1680            .await
1681            .unwrap();
1682
1683        // Test creating a new user message
1684        thread.update(cx, |thread, cx| {
1685            thread.push_user_content_block(
1686                None,
1687                acp::ContentBlock::Text(acp::TextContent {
1688                    annotations: None,
1689                    text: "Hello, ".to_string(),
1690                }),
1691                cx,
1692            );
1693        });
1694
1695        thread.update(cx, |thread, cx| {
1696            assert_eq!(thread.entries.len(), 1);
1697            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1698                assert_eq!(user_msg.id, None);
1699                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1700            } else {
1701                panic!("Expected UserMessage");
1702            }
1703        });
1704
1705        // Test appending to existing user message
1706        let message_1_id = UserMessageId::new();
1707        thread.update(cx, |thread, cx| {
1708            thread.push_user_content_block(
1709                Some(message_1_id.clone()),
1710                acp::ContentBlock::Text(acp::TextContent {
1711                    annotations: None,
1712                    text: "world!".to_string(),
1713                }),
1714                cx,
1715            );
1716        });
1717
1718        thread.update(cx, |thread, cx| {
1719            assert_eq!(thread.entries.len(), 1);
1720            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1721                assert_eq!(user_msg.id, Some(message_1_id));
1722                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1723            } else {
1724                panic!("Expected UserMessage");
1725            }
1726        });
1727
1728        // Test creating new user message after assistant message
1729        thread.update(cx, |thread, cx| {
1730            thread.push_assistant_content_block(
1731                acp::ContentBlock::Text(acp::TextContent {
1732                    annotations: None,
1733                    text: "Assistant response".to_string(),
1734                }),
1735                false,
1736                cx,
1737            );
1738        });
1739
1740        let message_2_id = UserMessageId::new();
1741        thread.update(cx, |thread, cx| {
1742            thread.push_user_content_block(
1743                Some(message_2_id.clone()),
1744                acp::ContentBlock::Text(acp::TextContent {
1745                    annotations: None,
1746                    text: "New user message".to_string(),
1747                }),
1748                cx,
1749            );
1750        });
1751
1752        thread.update(cx, |thread, cx| {
1753            assert_eq!(thread.entries.len(), 3);
1754            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1755                assert_eq!(user_msg.id, Some(message_2_id));
1756                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1757            } else {
1758                panic!("Expected UserMessage at index 2");
1759            }
1760        });
1761    }
1762
1763    #[gpui::test]
1764    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1765        init_test(cx);
1766
1767        let fs = FakeFs::new(cx.executor());
1768        let project = Project::test(fs, [], cx).await;
1769        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1770            |_, thread, mut cx| {
1771                async move {
1772                    thread.update(&mut cx, |thread, cx| {
1773                        thread
1774                            .handle_session_update(
1775                                acp::SessionUpdate::AgentThoughtChunk {
1776                                    content: "Thinking ".into(),
1777                                },
1778                                cx,
1779                            )
1780                            .unwrap();
1781                        thread
1782                            .handle_session_update(
1783                                acp::SessionUpdate::AgentThoughtChunk {
1784                                    content: "hard!".into(),
1785                                },
1786                                cx,
1787                            )
1788                            .unwrap();
1789                    })?;
1790                    Ok(acp::PromptResponse {
1791                        stop_reason: acp::StopReason::EndTurn,
1792                    })
1793                }
1794                .boxed_local()
1795            },
1796        ));
1797
1798        let thread = cx
1799            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1800            .await
1801            .unwrap();
1802
1803        thread
1804            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1805            .await
1806            .unwrap();
1807
1808        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1809        assert_eq!(
1810            output,
1811            indoc! {r#"
1812            ## User
1813
1814            Hello from Zed!
1815
1816            ## Assistant
1817
1818            <thinking>
1819            Thinking hard!
1820            </thinking>
1821
1822            "#}
1823        );
1824    }
1825
1826    #[gpui::test]
1827    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1828        init_test(cx);
1829
1830        let fs = FakeFs::new(cx.executor());
1831        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1832            .await;
1833        let project = Project::test(fs.clone(), [], cx).await;
1834        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1835        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1836        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1837            move |_, thread, mut cx| {
1838                let read_file_tx = read_file_tx.clone();
1839                async move {
1840                    let content = thread
1841                        .update(&mut cx, |thread, cx| {
1842                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1843                        })
1844                        .unwrap()
1845                        .await
1846                        .unwrap();
1847                    assert_eq!(content, "one\ntwo\nthree\n");
1848                    read_file_tx.take().unwrap().send(()).unwrap();
1849                    thread
1850                        .update(&mut cx, |thread, cx| {
1851                            thread.write_text_file(
1852                                path!("/tmp/foo").into(),
1853                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1854                                cx,
1855                            )
1856                        })
1857                        .unwrap()
1858                        .await
1859                        .unwrap();
1860                    Ok(acp::PromptResponse {
1861                        stop_reason: acp::StopReason::EndTurn,
1862                    })
1863                }
1864                .boxed_local()
1865            },
1866        ));
1867
1868        let (worktree, pathbuf) = project
1869            .update(cx, |project, cx| {
1870                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1871            })
1872            .await
1873            .unwrap();
1874        let buffer = project
1875            .update(cx, |project, cx| {
1876                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1877            })
1878            .await
1879            .unwrap();
1880
1881        let thread = cx
1882            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1883            .await
1884            .unwrap();
1885
1886        let request = thread.update(cx, |thread, cx| {
1887            thread.send_raw("Extend the count in /tmp/foo", cx)
1888        });
1889        read_file_rx.await.ok();
1890        buffer.update(cx, |buffer, cx| {
1891            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1892        });
1893        cx.run_until_parked();
1894        assert_eq!(
1895            buffer.read_with(cx, |buffer, _| buffer.text()),
1896            "zero\none\ntwo\nthree\nfour\nfive\n"
1897        );
1898        assert_eq!(
1899            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1900            "zero\none\ntwo\nthree\nfour\nfive\n"
1901        );
1902        request.await.unwrap();
1903    }
1904
1905    #[gpui::test]
1906    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1907        init_test(cx);
1908
1909        let fs = FakeFs::new(cx.executor());
1910        let project = Project::test(fs, [], cx).await;
1911        let id = acp::ToolCallId("test".into());
1912
1913        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1914            let id = id.clone();
1915            move |_, thread, mut cx| {
1916                let id = id.clone();
1917                async move {
1918                    thread
1919                        .update(&mut cx, |thread, cx| {
1920                            thread.handle_session_update(
1921                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1922                                    id: id.clone(),
1923                                    title: "Label".into(),
1924                                    kind: acp::ToolKind::Fetch,
1925                                    status: acp::ToolCallStatus::InProgress,
1926                                    content: vec![],
1927                                    locations: vec![],
1928                                    raw_input: None,
1929                                    raw_output: None,
1930                                }),
1931                                cx,
1932                            )
1933                        })
1934                        .unwrap()
1935                        .unwrap();
1936                    Ok(acp::PromptResponse {
1937                        stop_reason: acp::StopReason::EndTurn,
1938                    })
1939                }
1940                .boxed_local()
1941            }
1942        }));
1943
1944        let thread = cx
1945            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1946            .await
1947            .unwrap();
1948
1949        let request = thread.update(cx, |thread, cx| {
1950            thread.send_raw("Fetch https://example.com", cx)
1951        });
1952
1953        run_until_first_tool_call(&thread, cx).await;
1954
1955        thread.read_with(cx, |thread, _| {
1956            assert!(matches!(
1957                thread.entries[1],
1958                AgentThreadEntry::ToolCall(ToolCall {
1959                    status: ToolCallStatus::InProgress,
1960                    ..
1961                })
1962            ));
1963        });
1964
1965        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1966
1967        thread.read_with(cx, |thread, _| {
1968            assert!(matches!(
1969                &thread.entries[1],
1970                AgentThreadEntry::ToolCall(ToolCall {
1971                    status: ToolCallStatus::Canceled,
1972                    ..
1973                })
1974            ));
1975        });
1976
1977        thread
1978            .update(cx, |thread, cx| {
1979                thread.handle_session_update(
1980                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1981                        id,
1982                        fields: acp::ToolCallUpdateFields {
1983                            status: Some(acp::ToolCallStatus::Completed),
1984                            ..Default::default()
1985                        },
1986                    }),
1987                    cx,
1988                )
1989            })
1990            .unwrap();
1991
1992        request.await.unwrap();
1993
1994        thread.read_with(cx, |thread, _| {
1995            assert!(matches!(
1996                thread.entries[1],
1997                AgentThreadEntry::ToolCall(ToolCall {
1998                    status: ToolCallStatus::Completed,
1999                    ..
2000                })
2001            ));
2002        });
2003    }
2004
2005    #[gpui::test]
2006    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2007        init_test(cx);
2008        let fs = FakeFs::new(cx.background_executor.clone());
2009        fs.insert_tree(path!("/test"), json!({})).await;
2010        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2011
2012        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2013            move |_, thread, mut cx| {
2014                async move {
2015                    thread
2016                        .update(&mut cx, |thread, cx| {
2017                            thread.handle_session_update(
2018                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2019                                    id: acp::ToolCallId("test".into()),
2020                                    title: "Label".into(),
2021                                    kind: acp::ToolKind::Edit,
2022                                    status: acp::ToolCallStatus::Completed,
2023                                    content: vec![acp::ToolCallContent::Diff {
2024                                        diff: acp::Diff {
2025                                            path: "/test/test.txt".into(),
2026                                            old_text: None,
2027                                            new_text: "foo".into(),
2028                                        },
2029                                    }],
2030                                    locations: vec![],
2031                                    raw_input: None,
2032                                    raw_output: None,
2033                                }),
2034                                cx,
2035                            )
2036                        })
2037                        .unwrap()
2038                        .unwrap();
2039                    Ok(acp::PromptResponse {
2040                        stop_reason: acp::StopReason::EndTurn,
2041                    })
2042                }
2043                .boxed_local()
2044            }
2045        }));
2046
2047        let thread = cx
2048            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2049            .await
2050            .unwrap();
2051
2052        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2053            .await
2054            .unwrap();
2055
2056        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2057    }
2058
2059    #[gpui::test(iterations = 10)]
2060    async fn test_checkpoints(cx: &mut TestAppContext) {
2061        init_test(cx);
2062        let fs = FakeFs::new(cx.background_executor.clone());
2063        fs.insert_tree(
2064            path!("/test"),
2065            json!({
2066                ".git": {}
2067            }),
2068        )
2069        .await;
2070        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2071
2072        let simulate_changes = Arc::new(AtomicBool::new(true));
2073        let next_filename = Arc::new(AtomicUsize::new(0));
2074        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2075            let simulate_changes = simulate_changes.clone();
2076            let next_filename = next_filename.clone();
2077            let fs = fs.clone();
2078            move |request, thread, mut cx| {
2079                let fs = fs.clone();
2080                let simulate_changes = simulate_changes.clone();
2081                let next_filename = next_filename.clone();
2082                async move {
2083                    if simulate_changes.load(SeqCst) {
2084                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2085                        fs.write(Path::new(&filename), b"").await?;
2086                    }
2087
2088                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2089                        panic!("expected text content block");
2090                    };
2091                    thread.update(&mut cx, |thread, cx| {
2092                        thread
2093                            .handle_session_update(
2094                                acp::SessionUpdate::AgentMessageChunk {
2095                                    content: content.text.to_uppercase().into(),
2096                                },
2097                                cx,
2098                            )
2099                            .unwrap();
2100                    })?;
2101                    Ok(acp::PromptResponse {
2102                        stop_reason: acp::StopReason::EndTurn,
2103                    })
2104                }
2105                .boxed_local()
2106            }
2107        }));
2108        let thread = cx
2109            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2110            .await
2111            .unwrap();
2112
2113        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2114            .await
2115            .unwrap();
2116        thread.read_with(cx, |thread, cx| {
2117            assert_eq!(
2118                thread.to_markdown(cx),
2119                indoc! {"
2120                    ## User (checkpoint)
2121
2122                    Lorem
2123
2124                    ## Assistant
2125
2126                    LOREM
2127
2128                "}
2129            );
2130        });
2131        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2132
2133        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2134            .await
2135            .unwrap();
2136        thread.read_with(cx, |thread, cx| {
2137            assert_eq!(
2138                thread.to_markdown(cx),
2139                indoc! {"
2140                    ## User (checkpoint)
2141
2142                    Lorem
2143
2144                    ## Assistant
2145
2146                    LOREM
2147
2148                    ## User (checkpoint)
2149
2150                    ipsum
2151
2152                    ## Assistant
2153
2154                    IPSUM
2155
2156                "}
2157            );
2158        });
2159        assert_eq!(
2160            fs.files(),
2161            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2162        );
2163
2164        // Checkpoint isn't stored when there are no changes.
2165        simulate_changes.store(false, SeqCst);
2166        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2167            .await
2168            .unwrap();
2169        thread.read_with(cx, |thread, cx| {
2170            assert_eq!(
2171                thread.to_markdown(cx),
2172                indoc! {"
2173                    ## User (checkpoint)
2174
2175                    Lorem
2176
2177                    ## Assistant
2178
2179                    LOREM
2180
2181                    ## User (checkpoint)
2182
2183                    ipsum
2184
2185                    ## Assistant
2186
2187                    IPSUM
2188
2189                    ## User
2190
2191                    dolor
2192
2193                    ## Assistant
2194
2195                    DOLOR
2196
2197                "}
2198            );
2199        });
2200        assert_eq!(
2201            fs.files(),
2202            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2203        );
2204
2205        // Rewinding the conversation truncates the history and restores the checkpoint.
2206        thread
2207            .update(cx, |thread, cx| {
2208                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2209                    panic!("unexpected entries {:?}", thread.entries)
2210                };
2211                thread.rewind(message.id.clone().unwrap(), cx)
2212            })
2213            .await
2214            .unwrap();
2215        thread.read_with(cx, |thread, cx| {
2216            assert_eq!(
2217                thread.to_markdown(cx),
2218                indoc! {"
2219                    ## User (checkpoint)
2220
2221                    Lorem
2222
2223                    ## Assistant
2224
2225                    LOREM
2226
2227                "}
2228            );
2229        });
2230        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2231    }
2232
2233    async fn run_until_first_tool_call(
2234        thread: &Entity<AcpThread>,
2235        cx: &mut TestAppContext,
2236    ) -> usize {
2237        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2238
2239        let subscription = cx.update(|cx| {
2240            cx.subscribe(thread, move |thread, _, cx| {
2241                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2242                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2243                        return tx.try_send(ix).unwrap();
2244                    }
2245                }
2246            })
2247        });
2248
2249        select! {
2250            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2251                panic!("Timeout waiting for tool call")
2252            }
2253            ix = rx.next().fuse() => {
2254                drop(subscription);
2255                ix.unwrap()
2256            }
2257        }
2258    }
2259
2260    #[derive(Clone, Default)]
2261    struct FakeAgentConnection {
2262        auth_methods: Vec<acp::AuthMethod>,
2263        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2264        on_user_message: Option<
2265            Rc<
2266                dyn Fn(
2267                        acp::PromptRequest,
2268                        WeakEntity<AcpThread>,
2269                        AsyncApp,
2270                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2271                    + 'static,
2272            >,
2273        >,
2274    }
2275
2276    impl FakeAgentConnection {
2277        fn new() -> Self {
2278            Self {
2279                auth_methods: Vec::new(),
2280                on_user_message: None,
2281                sessions: Arc::default(),
2282            }
2283        }
2284
2285        #[expect(unused)]
2286        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2287            self.auth_methods = auth_methods;
2288            self
2289        }
2290
2291        fn on_user_message(
2292            mut self,
2293            handler: impl Fn(
2294                acp::PromptRequest,
2295                WeakEntity<AcpThread>,
2296                AsyncApp,
2297            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2298            + 'static,
2299        ) -> Self {
2300            self.on_user_message.replace(Rc::new(handler));
2301            self
2302        }
2303    }
2304
2305    impl AgentConnection for FakeAgentConnection {
2306        fn auth_methods(&self) -> &[acp::AuthMethod] {
2307            &self.auth_methods
2308        }
2309
2310        fn new_thread(
2311            self: Rc<Self>,
2312            project: Entity<Project>,
2313            _cwd: &Path,
2314            cx: &mut gpui::App,
2315        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2316            let session_id = acp::SessionId(
2317                rand::thread_rng()
2318                    .sample_iter(&rand::distributions::Alphanumeric)
2319                    .take(7)
2320                    .map(char::from)
2321                    .collect::<String>()
2322                    .into(),
2323            );
2324            let thread =
2325                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2326            self.sessions.lock().insert(session_id, thread.downgrade());
2327            Task::ready(Ok(thread))
2328        }
2329
2330        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2331            if self.auth_methods().iter().any(|m| m.id == method) {
2332                Task::ready(Ok(()))
2333            } else {
2334                Task::ready(Err(anyhow!("Invalid Auth Method")))
2335            }
2336        }
2337
2338        fn prompt(
2339            &self,
2340            _id: Option<UserMessageId>,
2341            params: acp::PromptRequest,
2342            cx: &mut App,
2343        ) -> Task<gpui::Result<acp::PromptResponse>> {
2344            let sessions = self.sessions.lock();
2345            let thread = sessions.get(&params.session_id).unwrap();
2346            if let Some(handler) = &self.on_user_message {
2347                let handler = handler.clone();
2348                let thread = thread.clone();
2349                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2350            } else {
2351                Task::ready(Ok(acp::PromptResponse {
2352                    stop_reason: acp::StopReason::EndTurn,
2353                }))
2354            }
2355        }
2356
2357        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2358            let sessions = self.sessions.lock();
2359            let thread = sessions.get(&session_id).unwrap().clone();
2360
2361            cx.spawn(async move |cx| {
2362                thread
2363                    .update(cx, |thread, cx| thread.cancel(cx))
2364                    .unwrap()
2365                    .await
2366            })
2367            .detach();
2368        }
2369
2370        fn session_editor(
2371            &self,
2372            session_id: &acp::SessionId,
2373            _cx: &mut App,
2374        ) -> Option<Rc<dyn AgentSessionEditor>> {
2375            Some(Rc::new(FakeAgentSessionEditor {
2376                _session_id: session_id.clone(),
2377            }))
2378        }
2379
2380        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2381            self
2382        }
2383    }
2384
2385    struct FakeAgentSessionEditor {
2386        _session_id: acp::SessionId,
2387    }
2388
2389    impl AgentSessionEditor for FakeAgentSessionEditor {
2390        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2391            Task::ready(Ok(()))
2392        }
2393    }
2394}