acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9pub use terminal::*;
  10
  11use action_log::ActionLog;
  12use agent_client_protocol as acp;
  13use anyhow::{Context as _, Result, anyhow};
  14use editor::Bias;
  15use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  16use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  17use itertools::Itertools;
  18use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  19use markdown::Markdown;
  20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  21use std::collections::HashMap;
  22use std::error::Error;
  23use std::fmt::{Formatter, Write};
  24use std::ops::Range;
  25use std::process::ExitStatus;
  26use std::rc::Rc;
  27use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  28use ui::App;
  29use util::ResultExt;
  30
  31#[derive(Debug)]
  32pub struct UserMessage {
  33    pub id: Option<UserMessageId>,
  34    pub content: ContentBlock,
  35    pub chunks: Vec<acp::ContentBlock>,
  36    pub checkpoint: Option<Checkpoint>,
  37}
  38
  39#[derive(Debug)]
  40pub struct Checkpoint {
  41    git_checkpoint: GitStoreCheckpoint,
  42    pub show: bool,
  43}
  44
  45impl UserMessage {
  46    fn to_markdown(&self, cx: &App) -> String {
  47        let mut markdown = String::new();
  48        if self
  49            .checkpoint
  50            .as_ref()
  51            .map_or(false, |checkpoint| checkpoint.show)
  52        {
  53            writeln!(markdown, "## User (checkpoint)").unwrap();
  54        } else {
  55            writeln!(markdown, "## User").unwrap();
  56        }
  57        writeln!(markdown).unwrap();
  58        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  59        writeln!(markdown).unwrap();
  60        markdown
  61    }
  62}
  63
  64#[derive(Debug, PartialEq)]
  65pub struct AssistantMessage {
  66    pub chunks: Vec<AssistantMessageChunk>,
  67}
  68
  69impl AssistantMessage {
  70    pub fn to_markdown(&self, cx: &App) -> String {
  71        format!(
  72            "## Assistant\n\n{}\n\n",
  73            self.chunks
  74                .iter()
  75                .map(|chunk| chunk.to_markdown(cx))
  76                .join("\n\n")
  77        )
  78    }
  79}
  80
  81#[derive(Debug, PartialEq)]
  82pub enum AssistantMessageChunk {
  83    Message { block: ContentBlock },
  84    Thought { block: ContentBlock },
  85}
  86
  87impl AssistantMessageChunk {
  88    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  89        Self::Message {
  90            block: ContentBlock::new(chunk.into(), language_registry, cx),
  91        }
  92    }
  93
  94    fn to_markdown(&self, cx: &App) -> String {
  95        match self {
  96            Self::Message { block } => block.to_markdown(cx).to_string(),
  97            Self::Thought { block } => {
  98                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
  99            }
 100        }
 101    }
 102}
 103
 104#[derive(Debug)]
 105pub enum AgentThreadEntry {
 106    UserMessage(UserMessage),
 107    AssistantMessage(AssistantMessage),
 108    ToolCall(ToolCall),
 109}
 110
 111impl AgentThreadEntry {
 112    pub fn to_markdown(&self, cx: &App) -> String {
 113        match self {
 114            Self::UserMessage(message) => message.to_markdown(cx),
 115            Self::AssistantMessage(message) => message.to_markdown(cx),
 116            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 117        }
 118    }
 119
 120    pub fn user_message(&self) -> Option<&UserMessage> {
 121        if let AgentThreadEntry::UserMessage(message) = self {
 122            Some(message)
 123        } else {
 124            None
 125        }
 126    }
 127
 128    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 129        if let AgentThreadEntry::ToolCall(call) = self {
 130            itertools::Either::Left(call.diffs())
 131        } else {
 132            itertools::Either::Right(std::iter::empty())
 133        }
 134    }
 135
 136    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 137        if let AgentThreadEntry::ToolCall(call) = self {
 138            itertools::Either::Left(call.terminals())
 139        } else {
 140            itertools::Either::Right(std::iter::empty())
 141        }
 142    }
 143
 144    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 145        if let AgentThreadEntry::ToolCall(ToolCall {
 146            locations,
 147            resolved_locations,
 148            ..
 149        }) = self
 150        {
 151            Some((
 152                locations.get(ix)?.clone(),
 153                resolved_locations.get(ix)?.clone()?,
 154            ))
 155        } else {
 156            None
 157        }
 158    }
 159}
 160
 161#[derive(Debug)]
 162pub struct ToolCall {
 163    pub id: acp::ToolCallId,
 164    pub label: Entity<Markdown>,
 165    pub kind: acp::ToolKind,
 166    pub content: Vec<ToolCallContent>,
 167    pub status: ToolCallStatus,
 168    pub locations: Vec<acp::ToolCallLocation>,
 169    pub resolved_locations: Vec<Option<AgentLocation>>,
 170    pub raw_input: Option<serde_json::Value>,
 171    pub raw_output: Option<serde_json::Value>,
 172}
 173
 174impl ToolCall {
 175    fn from_acp(
 176        tool_call: acp::ToolCall,
 177        status: ToolCallStatus,
 178        language_registry: Arc<LanguageRegistry>,
 179        cx: &mut App,
 180    ) -> Self {
 181        Self {
 182            id: tool_call.id,
 183            label: cx.new(|cx| {
 184                Markdown::new(
 185                    tool_call.title.into(),
 186                    Some(language_registry.clone()),
 187                    None,
 188                    cx,
 189                )
 190            }),
 191            kind: tool_call.kind,
 192            content: tool_call
 193                .content
 194                .into_iter()
 195                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 196                .collect(),
 197            locations: tool_call.locations,
 198            resolved_locations: Vec::default(),
 199            status,
 200            raw_input: tool_call.raw_input,
 201            raw_output: tool_call.raw_output,
 202        }
 203    }
 204
 205    fn update_fields(
 206        &mut self,
 207        fields: acp::ToolCallUpdateFields,
 208        language_registry: Arc<LanguageRegistry>,
 209        cx: &mut App,
 210    ) {
 211        let acp::ToolCallUpdateFields {
 212            kind,
 213            status,
 214            title,
 215            content,
 216            locations,
 217            raw_input,
 218            raw_output,
 219        } = fields;
 220
 221        if let Some(kind) = kind {
 222            self.kind = kind;
 223        }
 224
 225        if let Some(status) = status {
 226            self.status = ToolCallStatus::Allowed { status };
 227        }
 228
 229        if let Some(title) = title {
 230            self.label.update(cx, |label, cx| {
 231                label.replace(title, cx);
 232            });
 233        }
 234
 235        if let Some(content) = content {
 236            self.content = content
 237                .into_iter()
 238                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 239                .collect();
 240        }
 241
 242        if let Some(locations) = locations {
 243            self.locations = locations;
 244        }
 245
 246        if let Some(raw_input) = raw_input {
 247            self.raw_input = Some(raw_input);
 248        }
 249
 250        if let Some(raw_output) = raw_output {
 251            if self.content.is_empty() {
 252                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 253                {
 254                    self.content
 255                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 256                            markdown,
 257                        }));
 258                }
 259            }
 260            self.raw_output = Some(raw_output);
 261        }
 262    }
 263
 264    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 265        self.content.iter().filter_map(|content| match content {
 266            ToolCallContent::Diff(diff) => Some(diff),
 267            ToolCallContent::ContentBlock(_) => None,
 268            ToolCallContent::Terminal(_) => None,
 269        })
 270    }
 271
 272    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 273        self.content.iter().filter_map(|content| match content {
 274            ToolCallContent::Terminal(terminal) => Some(terminal),
 275            ToolCallContent::ContentBlock(_) => None,
 276            ToolCallContent::Diff(_) => None,
 277        })
 278    }
 279
 280    fn to_markdown(&self, cx: &App) -> String {
 281        let mut markdown = format!(
 282            "**Tool Call: {}**\nStatus: {}\n\n",
 283            self.label.read(cx).source(),
 284            self.status
 285        );
 286        for content in &self.content {
 287            markdown.push_str(content.to_markdown(cx).as_str());
 288            markdown.push_str("\n\n");
 289        }
 290        markdown
 291    }
 292
 293    async fn resolve_location(
 294        location: acp::ToolCallLocation,
 295        project: WeakEntity<Project>,
 296        cx: &mut AsyncApp,
 297    ) -> Option<AgentLocation> {
 298        let buffer = project
 299            .update(cx, |project, cx| {
 300                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 301                    Some(project.open_buffer(path, cx))
 302                } else {
 303                    None
 304                }
 305            })
 306            .ok()??;
 307        let buffer = buffer.await.log_err()?;
 308        let position = buffer
 309            .update(cx, |buffer, _| {
 310                if let Some(row) = location.line {
 311                    let snapshot = buffer.snapshot();
 312                    let column = snapshot.indent_size_for_line(row).len;
 313                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 314                    snapshot.anchor_before(point)
 315                } else {
 316                    Anchor::MIN
 317                }
 318            })
 319            .ok()?;
 320
 321        Some(AgentLocation {
 322            buffer: buffer.downgrade(),
 323            position,
 324        })
 325    }
 326
 327    fn resolve_locations(
 328        &self,
 329        project: Entity<Project>,
 330        cx: &mut App,
 331    ) -> Task<Vec<Option<AgentLocation>>> {
 332        let locations = self.locations.clone();
 333        project.update(cx, |_, cx| {
 334            cx.spawn(async move |project, cx| {
 335                let mut new_locations = Vec::new();
 336                for location in locations {
 337                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 338                }
 339                new_locations
 340            })
 341        })
 342    }
 343}
 344
 345#[derive(Debug)]
 346pub enum ToolCallStatus {
 347    WaitingForConfirmation {
 348        options: Vec<acp::PermissionOption>,
 349        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 350    },
 351    Allowed {
 352        status: acp::ToolCallStatus,
 353    },
 354    Rejected,
 355    Canceled,
 356}
 357
 358impl Display for ToolCallStatus {
 359    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 360        write!(
 361            f,
 362            "{}",
 363            match self {
 364                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 365                ToolCallStatus::Allowed { status } => match status {
 366                    acp::ToolCallStatus::Pending => "Pending",
 367                    acp::ToolCallStatus::InProgress => "In Progress",
 368                    acp::ToolCallStatus::Completed => "Completed",
 369                    acp::ToolCallStatus::Failed => "Failed",
 370                },
 371                ToolCallStatus::Rejected => "Rejected",
 372                ToolCallStatus::Canceled => "Canceled",
 373            }
 374        )
 375    }
 376}
 377
 378#[derive(Debug, PartialEq, Clone)]
 379pub enum ContentBlock {
 380    Empty,
 381    Markdown { markdown: Entity<Markdown> },
 382    ResourceLink { resource_link: acp::ResourceLink },
 383}
 384
 385impl ContentBlock {
 386    pub fn new(
 387        block: acp::ContentBlock,
 388        language_registry: &Arc<LanguageRegistry>,
 389        cx: &mut App,
 390    ) -> Self {
 391        let mut this = Self::Empty;
 392        this.append(block, language_registry, cx);
 393        this
 394    }
 395
 396    pub fn new_combined(
 397        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 398        language_registry: Arc<LanguageRegistry>,
 399        cx: &mut App,
 400    ) -> Self {
 401        let mut this = Self::Empty;
 402        for block in blocks {
 403            this.append(block, &language_registry, cx);
 404        }
 405        this
 406    }
 407
 408    pub fn append(
 409        &mut self,
 410        block: acp::ContentBlock,
 411        language_registry: &Arc<LanguageRegistry>,
 412        cx: &mut App,
 413    ) {
 414        if matches!(self, ContentBlock::Empty) {
 415            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 416                *self = ContentBlock::ResourceLink { resource_link };
 417                return;
 418            }
 419        }
 420
 421        let new_content = self.block_string_contents(block);
 422
 423        match self {
 424            ContentBlock::Empty => {
 425                *self = Self::create_markdown_block(new_content, language_registry, cx);
 426            }
 427            ContentBlock::Markdown { markdown } => {
 428                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 429            }
 430            ContentBlock::ResourceLink { resource_link } => {
 431                let existing_content = Self::resource_link_md(&resource_link.uri);
 432                let combined = format!("{}\n{}", existing_content, new_content);
 433
 434                *self = Self::create_markdown_block(combined, language_registry, cx);
 435            }
 436        }
 437    }
 438
 439    fn create_markdown_block(
 440        content: String,
 441        language_registry: &Arc<LanguageRegistry>,
 442        cx: &mut App,
 443    ) -> ContentBlock {
 444        ContentBlock::Markdown {
 445            markdown: cx
 446                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 447        }
 448    }
 449
 450    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 451        match block {
 452            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 453            acp::ContentBlock::ResourceLink(resource_link) => {
 454                Self::resource_link_md(&resource_link.uri)
 455            }
 456            acp::ContentBlock::Resource(acp::EmbeddedResource {
 457                resource:
 458                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 459                        uri,
 460                        ..
 461                    }),
 462                ..
 463            }) => Self::resource_link_md(&uri),
 464            acp::ContentBlock::Image(image) => Self::image_md(&image),
 465            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 466        }
 467    }
 468
 469    fn resource_link_md(uri: &str) -> String {
 470        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 471            uri.as_link().to_string()
 472        } else {
 473            uri.to_string()
 474        }
 475    }
 476
 477    fn image_md(_image: &acp::ImageContent) -> String {
 478        "`Image`".into()
 479    }
 480
 481    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 482        match self {
 483            ContentBlock::Empty => "",
 484            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 485            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 486        }
 487    }
 488
 489    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 490        match self {
 491            ContentBlock::Empty => None,
 492            ContentBlock::Markdown { markdown } => Some(markdown),
 493            ContentBlock::ResourceLink { .. } => None,
 494        }
 495    }
 496
 497    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 498        match self {
 499            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 500            _ => None,
 501        }
 502    }
 503}
 504
 505#[derive(Debug)]
 506pub enum ToolCallContent {
 507    ContentBlock(ContentBlock),
 508    Diff(Entity<Diff>),
 509    Terminal(Entity<Terminal>),
 510}
 511
 512impl ToolCallContent {
 513    pub fn from_acp(
 514        content: acp::ToolCallContent,
 515        language_registry: Arc<LanguageRegistry>,
 516        cx: &mut App,
 517    ) -> Self {
 518        match content {
 519            acp::ToolCallContent::Content { content } => {
 520                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 521            }
 522            acp::ToolCallContent::Diff { diff } => {
 523                Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
 524            }
 525        }
 526    }
 527
 528    pub fn to_markdown(&self, cx: &App) -> String {
 529        match self {
 530            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 531            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 532            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 533        }
 534    }
 535}
 536
 537#[derive(Debug, PartialEq)]
 538pub enum ToolCallUpdate {
 539    UpdateFields(acp::ToolCallUpdate),
 540    UpdateDiff(ToolCallUpdateDiff),
 541    UpdateTerminal(ToolCallUpdateTerminal),
 542}
 543
 544impl ToolCallUpdate {
 545    fn id(&self) -> &acp::ToolCallId {
 546        match self {
 547            Self::UpdateFields(update) => &update.id,
 548            Self::UpdateDiff(diff) => &diff.id,
 549            Self::UpdateTerminal(terminal) => &terminal.id,
 550        }
 551    }
 552}
 553
 554impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 555    fn from(update: acp::ToolCallUpdate) -> Self {
 556        Self::UpdateFields(update)
 557    }
 558}
 559
 560impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 561    fn from(diff: ToolCallUpdateDiff) -> Self {
 562        Self::UpdateDiff(diff)
 563    }
 564}
 565
 566#[derive(Debug, PartialEq)]
 567pub struct ToolCallUpdateDiff {
 568    pub id: acp::ToolCallId,
 569    pub diff: Entity<Diff>,
 570}
 571
 572impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 573    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 574        Self::UpdateTerminal(terminal)
 575    }
 576}
 577
 578#[derive(Debug, PartialEq)]
 579pub struct ToolCallUpdateTerminal {
 580    pub id: acp::ToolCallId,
 581    pub terminal: Entity<Terminal>,
 582}
 583
 584#[derive(Debug, Default)]
 585pub struct Plan {
 586    pub entries: Vec<PlanEntry>,
 587}
 588
 589#[derive(Debug)]
 590pub struct PlanStats<'a> {
 591    pub in_progress_entry: Option<&'a PlanEntry>,
 592    pub pending: u32,
 593    pub completed: u32,
 594}
 595
 596impl Plan {
 597    pub fn is_empty(&self) -> bool {
 598        self.entries.is_empty()
 599    }
 600
 601    pub fn stats(&self) -> PlanStats<'_> {
 602        let mut stats = PlanStats {
 603            in_progress_entry: None,
 604            pending: 0,
 605            completed: 0,
 606        };
 607
 608        for entry in &self.entries {
 609            match &entry.status {
 610                acp::PlanEntryStatus::Pending => {
 611                    stats.pending += 1;
 612                }
 613                acp::PlanEntryStatus::InProgress => {
 614                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 615                }
 616                acp::PlanEntryStatus::Completed => {
 617                    stats.completed += 1;
 618                }
 619            }
 620        }
 621
 622        stats
 623    }
 624}
 625
 626#[derive(Debug)]
 627pub struct PlanEntry {
 628    pub content: Entity<Markdown>,
 629    pub priority: acp::PlanEntryPriority,
 630    pub status: acp::PlanEntryStatus,
 631}
 632
 633impl PlanEntry {
 634    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 635        Self {
 636            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 637            priority: entry.priority,
 638            status: entry.status,
 639        }
 640    }
 641}
 642
 643pub struct AcpThread {
 644    title: SharedString,
 645    entries: Vec<AgentThreadEntry>,
 646    plan: Plan,
 647    project: Entity<Project>,
 648    action_log: Entity<ActionLog>,
 649    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 650    send_task: Option<Task<()>>,
 651    connection: Rc<dyn AgentConnection>,
 652    session_id: acp::SessionId,
 653}
 654
 655pub enum AcpThreadEvent {
 656    NewEntry,
 657    EntryUpdated(usize),
 658    EntriesRemoved(Range<usize>),
 659    ToolAuthorizationRequired,
 660    Stopped,
 661    Error,
 662    ServerExited(ExitStatus),
 663}
 664
 665impl EventEmitter<AcpThreadEvent> for AcpThread {}
 666
 667#[derive(PartialEq, Eq)]
 668pub enum ThreadStatus {
 669    Idle,
 670    WaitingForToolConfirmation,
 671    Generating,
 672}
 673
 674#[derive(Debug, Clone)]
 675pub enum LoadError {
 676    Unsupported {
 677        error_message: SharedString,
 678        upgrade_message: SharedString,
 679        upgrade_command: String,
 680    },
 681    Exited(i32),
 682    Other(SharedString),
 683}
 684
 685impl Display for LoadError {
 686    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 687        match self {
 688            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 689            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 690            LoadError::Other(msg) => write!(f, "{}", msg),
 691        }
 692    }
 693}
 694
 695impl Error for LoadError {}
 696
 697impl AcpThread {
 698    pub fn new(
 699        title: impl Into<SharedString>,
 700        connection: Rc<dyn AgentConnection>,
 701        project: Entity<Project>,
 702        session_id: acp::SessionId,
 703        cx: &mut Context<Self>,
 704    ) -> Self {
 705        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 706
 707        Self {
 708            action_log,
 709            shared_buffers: Default::default(),
 710            entries: Default::default(),
 711            plan: Default::default(),
 712            title: title.into(),
 713            project,
 714            send_task: None,
 715            connection,
 716            session_id,
 717        }
 718    }
 719
 720    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 721        &self.connection
 722    }
 723
 724    pub fn action_log(&self) -> &Entity<ActionLog> {
 725        &self.action_log
 726    }
 727
 728    pub fn project(&self) -> &Entity<Project> {
 729        &self.project
 730    }
 731
 732    pub fn title(&self) -> SharedString {
 733        self.title.clone()
 734    }
 735
 736    pub fn entries(&self) -> &[AgentThreadEntry] {
 737        &self.entries
 738    }
 739
 740    pub fn session_id(&self) -> &acp::SessionId {
 741        &self.session_id
 742    }
 743
 744    pub fn status(&self) -> ThreadStatus {
 745        if self.send_task.is_some() {
 746            if self.waiting_for_tool_confirmation() {
 747                ThreadStatus::WaitingForToolConfirmation
 748            } else {
 749                ThreadStatus::Generating
 750            }
 751        } else {
 752            ThreadStatus::Idle
 753        }
 754    }
 755
 756    pub fn has_pending_edit_tool_calls(&self) -> bool {
 757        for entry in self.entries.iter().rev() {
 758            match entry {
 759                AgentThreadEntry::UserMessage(_) => return false,
 760                AgentThreadEntry::ToolCall(
 761                    call @ ToolCall {
 762                        status:
 763                            ToolCallStatus::Allowed {
 764                                status:
 765                                    acp::ToolCallStatus::InProgress | acp::ToolCallStatus::Pending,
 766                            },
 767                        ..
 768                    },
 769                ) if call.diffs().next().is_some() => {
 770                    return true;
 771                }
 772                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 773            }
 774        }
 775
 776        false
 777    }
 778
 779    pub fn used_tools_since_last_user_message(&self) -> bool {
 780        for entry in self.entries.iter().rev() {
 781            match entry {
 782                AgentThreadEntry::UserMessage(..) => return false,
 783                AgentThreadEntry::AssistantMessage(..) => continue,
 784                AgentThreadEntry::ToolCall(..) => return true,
 785            }
 786        }
 787
 788        false
 789    }
 790
 791    pub fn handle_session_update(
 792        &mut self,
 793        update: acp::SessionUpdate,
 794        cx: &mut Context<Self>,
 795    ) -> Result<(), acp::Error> {
 796        match update {
 797            acp::SessionUpdate::UserMessageChunk { content } => {
 798                self.push_user_content_block(None, content, cx);
 799            }
 800            acp::SessionUpdate::AgentMessageChunk { content } => {
 801                self.push_assistant_content_block(content, false, cx);
 802            }
 803            acp::SessionUpdate::AgentThoughtChunk { content } => {
 804                self.push_assistant_content_block(content, true, cx);
 805            }
 806            acp::SessionUpdate::ToolCall(tool_call) => {
 807                self.upsert_tool_call(tool_call, cx)?;
 808            }
 809            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 810                self.update_tool_call(tool_call_update, cx)?;
 811            }
 812            acp::SessionUpdate::Plan(plan) => {
 813                self.update_plan(plan, cx);
 814            }
 815        }
 816        Ok(())
 817    }
 818
 819    pub fn push_user_content_block(
 820        &mut self,
 821        message_id: Option<UserMessageId>,
 822        chunk: acp::ContentBlock,
 823        cx: &mut Context<Self>,
 824    ) {
 825        let language_registry = self.project.read(cx).languages().clone();
 826        let entries_len = self.entries.len();
 827
 828        if let Some(last_entry) = self.entries.last_mut()
 829            && let AgentThreadEntry::UserMessage(UserMessage {
 830                id,
 831                content,
 832                chunks,
 833                ..
 834            }) = last_entry
 835        {
 836            *id = message_id.or(id.take());
 837            content.append(chunk.clone(), &language_registry, cx);
 838            chunks.push(chunk);
 839            let idx = entries_len - 1;
 840            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 841        } else {
 842            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 843            self.push_entry(
 844                AgentThreadEntry::UserMessage(UserMessage {
 845                    id: message_id,
 846                    content,
 847                    chunks: vec![chunk],
 848                    checkpoint: None,
 849                }),
 850                cx,
 851            );
 852        }
 853    }
 854
 855    pub fn push_assistant_content_block(
 856        &mut self,
 857        chunk: acp::ContentBlock,
 858        is_thought: bool,
 859        cx: &mut Context<Self>,
 860    ) {
 861        let language_registry = self.project.read(cx).languages().clone();
 862        let entries_len = self.entries.len();
 863        if let Some(last_entry) = self.entries.last_mut()
 864            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 865        {
 866            let idx = entries_len - 1;
 867            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 868            match (chunks.last_mut(), is_thought) {
 869                (Some(AssistantMessageChunk::Message { block }), false)
 870                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 871                    block.append(chunk, &language_registry, cx)
 872                }
 873                _ => {
 874                    let block = ContentBlock::new(chunk, &language_registry, cx);
 875                    if is_thought {
 876                        chunks.push(AssistantMessageChunk::Thought { block })
 877                    } else {
 878                        chunks.push(AssistantMessageChunk::Message { block })
 879                    }
 880                }
 881            }
 882        } else {
 883            let block = ContentBlock::new(chunk, &language_registry, cx);
 884            let chunk = if is_thought {
 885                AssistantMessageChunk::Thought { block }
 886            } else {
 887                AssistantMessageChunk::Message { block }
 888            };
 889
 890            self.push_entry(
 891                AgentThreadEntry::AssistantMessage(AssistantMessage {
 892                    chunks: vec![chunk],
 893                }),
 894                cx,
 895            );
 896        }
 897    }
 898
 899    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 900        self.entries.push(entry);
 901        cx.emit(AcpThreadEvent::NewEntry);
 902    }
 903
 904    pub fn update_tool_call(
 905        &mut self,
 906        update: impl Into<ToolCallUpdate>,
 907        cx: &mut Context<Self>,
 908    ) -> Result<()> {
 909        let update = update.into();
 910        let languages = self.project.read(cx).languages().clone();
 911
 912        let (ix, current_call) = self
 913            .tool_call_mut(update.id())
 914            .context("Tool call not found")?;
 915        match update {
 916            ToolCallUpdate::UpdateFields(update) => {
 917                let location_updated = update.fields.locations.is_some();
 918                current_call.update_fields(update.fields, languages, cx);
 919                if location_updated {
 920                    self.resolve_locations(update.id.clone(), cx);
 921                }
 922            }
 923            ToolCallUpdate::UpdateDiff(update) => {
 924                current_call.content.clear();
 925                current_call
 926                    .content
 927                    .push(ToolCallContent::Diff(update.diff));
 928            }
 929            ToolCallUpdate::UpdateTerminal(update) => {
 930                current_call.content.clear();
 931                current_call
 932                    .content
 933                    .push(ToolCallContent::Terminal(update.terminal));
 934            }
 935        }
 936
 937        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 938
 939        Ok(())
 940    }
 941
 942    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 943    pub fn upsert_tool_call(
 944        &mut self,
 945        tool_call: acp::ToolCall,
 946        cx: &mut Context<Self>,
 947    ) -> Result<(), acp::Error> {
 948        let status = ToolCallStatus::Allowed {
 949            status: tool_call.status,
 950        };
 951        self.upsert_tool_call_inner(tool_call.into(), status, cx)
 952    }
 953
 954    /// Fails if id does not match an existing entry.
 955    pub fn upsert_tool_call_inner(
 956        &mut self,
 957        tool_call_update: acp::ToolCallUpdate,
 958        status: ToolCallStatus,
 959        cx: &mut Context<Self>,
 960    ) -> Result<(), acp::Error> {
 961        let language_registry = self.project.read(cx).languages().clone();
 962        let id = tool_call_update.id.clone();
 963
 964        if let Some((ix, current_call)) = self.tool_call_mut(&id) {
 965            current_call.update_fields(tool_call_update.fields, language_registry, cx);
 966            current_call.status = status;
 967
 968            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 969        } else {
 970            let call =
 971                ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
 972            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 973        };
 974
 975        self.resolve_locations(id, cx);
 976        Ok(())
 977    }
 978
 979    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 980        // The tool call we are looking for is typically the last one, or very close to the end.
 981        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
 982        self.entries
 983            .iter_mut()
 984            .enumerate()
 985            .rev()
 986            .find_map(|(index, tool_call)| {
 987                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
 988                    && &tool_call.id == id
 989                {
 990                    Some((index, tool_call))
 991                } else {
 992                    None
 993                }
 994            })
 995    }
 996
 997    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
 998        let project = self.project.clone();
 999        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1000            return;
1001        };
1002        let task = tool_call.resolve_locations(project, cx);
1003        cx.spawn(async move |this, cx| {
1004            let resolved_locations = task.await;
1005            this.update(cx, |this, cx| {
1006                let project = this.project.clone();
1007                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1008                    return;
1009                };
1010                if let Some(Some(location)) = resolved_locations.last() {
1011                    project.update(cx, |project, cx| {
1012                        if let Some(agent_location) = project.agent_location() {
1013                            let should_ignore = agent_location.buffer == location.buffer
1014                                && location
1015                                    .buffer
1016                                    .update(cx, |buffer, _| {
1017                                        let snapshot = buffer.snapshot();
1018                                        let old_position =
1019                                            agent_location.position.to_point(&snapshot);
1020                                        let new_position = location.position.to_point(&snapshot);
1021                                        // ignore this so that when we get updates from the edit tool
1022                                        // the position doesn't reset to the startof line
1023                                        old_position.row == new_position.row
1024                                            && old_position.column > new_position.column
1025                                    })
1026                                    .ok()
1027                                    .unwrap_or_default();
1028                            if !should_ignore {
1029                                project.set_agent_location(Some(location.clone()), cx);
1030                            }
1031                        }
1032                    });
1033                }
1034                if tool_call.resolved_locations != resolved_locations {
1035                    tool_call.resolved_locations = resolved_locations;
1036                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1037                }
1038            })
1039        })
1040        .detach();
1041    }
1042
1043    pub fn request_tool_call_authorization(
1044        &mut self,
1045        tool_call: acp::ToolCallUpdate,
1046        options: Vec<acp::PermissionOption>,
1047        cx: &mut Context<Self>,
1048    ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1049        let (tx, rx) = oneshot::channel();
1050
1051        let status = ToolCallStatus::WaitingForConfirmation {
1052            options,
1053            respond_tx: tx,
1054        };
1055
1056        self.upsert_tool_call_inner(tool_call, status, cx)?;
1057        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1058        Ok(rx)
1059    }
1060
1061    pub fn authorize_tool_call(
1062        &mut self,
1063        id: acp::ToolCallId,
1064        option_id: acp::PermissionOptionId,
1065        option_kind: acp::PermissionOptionKind,
1066        cx: &mut Context<Self>,
1067    ) {
1068        let Some((ix, call)) = self.tool_call_mut(&id) else {
1069            return;
1070        };
1071
1072        let new_status = match option_kind {
1073            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1074                ToolCallStatus::Rejected
1075            }
1076            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1077                ToolCallStatus::Allowed {
1078                    status: acp::ToolCallStatus::InProgress,
1079                }
1080            }
1081        };
1082
1083        let curr_status = mem::replace(&mut call.status, new_status);
1084
1085        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1086            respond_tx.send(option_id).log_err();
1087        } else if cfg!(debug_assertions) {
1088            panic!("tried to authorize an already authorized tool call");
1089        }
1090
1091        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1092    }
1093
1094    /// Returns true if the last turn is awaiting tool authorization
1095    pub fn waiting_for_tool_confirmation(&self) -> bool {
1096        for entry in self.entries.iter().rev() {
1097            match &entry {
1098                AgentThreadEntry::ToolCall(call) => match call.status {
1099                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1100                    ToolCallStatus::Allowed { .. }
1101                    | ToolCallStatus::Rejected
1102                    | ToolCallStatus::Canceled => continue,
1103                },
1104                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1105                    // Reached the beginning of the turn
1106                    return false;
1107                }
1108            }
1109        }
1110        false
1111    }
1112
1113    pub fn plan(&self) -> &Plan {
1114        &self.plan
1115    }
1116
1117    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1118        let new_entries_len = request.entries.len();
1119        let mut new_entries = request.entries.into_iter();
1120
1121        // Reuse existing markdown to prevent flickering
1122        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1123            let PlanEntry {
1124                content,
1125                priority,
1126                status,
1127            } = old;
1128            content.update(cx, |old, cx| {
1129                old.replace(new.content, cx);
1130            });
1131            *priority = new.priority;
1132            *status = new.status;
1133        }
1134        for new in new_entries {
1135            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1136        }
1137        self.plan.entries.truncate(new_entries_len);
1138
1139        cx.notify();
1140    }
1141
1142    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1143        self.plan
1144            .entries
1145            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1146        cx.notify();
1147    }
1148
1149    #[cfg(any(test, feature = "test-support"))]
1150    pub fn send_raw(
1151        &mut self,
1152        message: &str,
1153        cx: &mut Context<Self>,
1154    ) -> BoxFuture<'static, Result<()>> {
1155        self.send(
1156            vec![acp::ContentBlock::Text(acp::TextContent {
1157                text: message.to_string(),
1158                annotations: None,
1159            })],
1160            cx,
1161        )
1162    }
1163
1164    pub fn send(
1165        &mut self,
1166        message: Vec<acp::ContentBlock>,
1167        cx: &mut Context<Self>,
1168    ) -> BoxFuture<'static, Result<()>> {
1169        let block = ContentBlock::new_combined(
1170            message.clone(),
1171            self.project.read(cx).languages().clone(),
1172            cx,
1173        );
1174        let request = acp::PromptRequest {
1175            prompt: message.clone(),
1176            session_id: self.session_id.clone(),
1177        };
1178        let git_store = self.project.read(cx).git_store().clone();
1179
1180        let message_id = if self
1181            .connection
1182            .session_editor(&self.session_id, cx)
1183            .is_some()
1184        {
1185            Some(UserMessageId::new())
1186        } else {
1187            None
1188        };
1189        self.push_entry(
1190            AgentThreadEntry::UserMessage(UserMessage {
1191                id: message_id.clone(),
1192                content: block,
1193                chunks: message,
1194                checkpoint: None,
1195            }),
1196            cx,
1197        );
1198
1199        self.run_turn(cx, async move |this, cx| {
1200            let old_checkpoint = git_store
1201                .update(cx, |git, cx| git.checkpoint(cx))?
1202                .await
1203                .context("failed to get old checkpoint")
1204                .log_err();
1205            this.update(cx, |this, cx| {
1206                if let Some((_ix, message)) = this.last_user_message() {
1207                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1208                        git_checkpoint,
1209                        show: false,
1210                    });
1211                }
1212                this.connection.prompt(message_id, request, cx)
1213            })?
1214            .await
1215        })
1216    }
1217
1218    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1219        self.run_turn(cx, async move |this, cx| {
1220            this.update(cx, |this, cx| {
1221                this.connection
1222                    .resume(&this.session_id, cx)
1223                    .map(|resume| resume.run(cx))
1224            })?
1225            .context("resuming a session is not supported")?
1226            .await
1227        })
1228    }
1229
1230    fn run_turn(
1231        &mut self,
1232        cx: &mut Context<Self>,
1233        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1234    ) -> BoxFuture<'static, Result<()>> {
1235        self.clear_completed_plan_entries(cx);
1236
1237        let (tx, rx) = oneshot::channel();
1238        let cancel_task = self.cancel(cx);
1239
1240        self.send_task = Some(cx.spawn(async move |this, cx| {
1241            cancel_task.await;
1242            tx.send(f(this, cx).await).ok();
1243        }));
1244
1245        cx.spawn(async move |this, cx| {
1246            let response = rx.await;
1247
1248            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1249                .await?;
1250
1251            this.update(cx, |this, cx| {
1252                match response {
1253                    Ok(Err(e)) => {
1254                        this.send_task.take();
1255                        cx.emit(AcpThreadEvent::Error);
1256                        Err(e)
1257                    }
1258                    result => {
1259                        let cancelled = matches!(
1260                            result,
1261                            Ok(Ok(acp::PromptResponse {
1262                                stop_reason: acp::StopReason::Cancelled
1263                            }))
1264                        );
1265
1266                        // We only take the task if the current prompt wasn't cancelled.
1267                        //
1268                        // This prompt may have been cancelled because another one was sent
1269                        // while it was still generating. In these cases, dropping `send_task`
1270                        // would cause the next generation to be cancelled.
1271                        if !cancelled {
1272                            this.send_task.take();
1273                        }
1274
1275                        cx.emit(AcpThreadEvent::Stopped);
1276                        Ok(())
1277                    }
1278                }
1279            })?
1280        })
1281        .boxed()
1282    }
1283
1284    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1285        let Some(send_task) = self.send_task.take() else {
1286            return Task::ready(());
1287        };
1288
1289        for entry in self.entries.iter_mut() {
1290            if let AgentThreadEntry::ToolCall(call) = entry {
1291                let cancel = matches!(
1292                    call.status,
1293                    ToolCallStatus::WaitingForConfirmation { .. }
1294                        | ToolCallStatus::Allowed {
1295                            status: acp::ToolCallStatus::InProgress
1296                        }
1297                );
1298
1299                if cancel {
1300                    call.status = ToolCallStatus::Canceled;
1301                }
1302            }
1303        }
1304
1305        self.connection.cancel(&self.session_id, cx);
1306
1307        // Wait for the send task to complete
1308        cx.foreground_executor().spawn(send_task)
1309    }
1310
1311    /// Rewinds this thread to before the entry at `index`, removing it and all
1312    /// subsequent entries while reverting any changes made from that point.
1313    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1314        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1315            return Task::ready(Err(anyhow!("not supported")));
1316        };
1317        let Some(message) = self.user_message(&id) else {
1318            return Task::ready(Err(anyhow!("message not found")));
1319        };
1320
1321        let checkpoint = message
1322            .checkpoint
1323            .as_ref()
1324            .map(|c| c.git_checkpoint.clone());
1325
1326        let git_store = self.project.read(cx).git_store().clone();
1327        cx.spawn(async move |this, cx| {
1328            if let Some(checkpoint) = checkpoint {
1329                git_store
1330                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1331                    .await?;
1332            }
1333
1334            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1335                .await?;
1336            this.update(cx, |this, cx| {
1337                if let Some((ix, _)) = this.user_message_mut(&id) {
1338                    let range = ix..this.entries.len();
1339                    this.entries.truncate(ix);
1340                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1341                }
1342            })
1343        })
1344    }
1345
1346    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1347        let git_store = self.project.read(cx).git_store().clone();
1348
1349        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1350            if let Some(checkpoint) = message.checkpoint.as_ref() {
1351                checkpoint.git_checkpoint.clone()
1352            } else {
1353                return Task::ready(Ok(()));
1354            }
1355        } else {
1356            return Task::ready(Ok(()));
1357        };
1358
1359        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1360        cx.spawn(async move |this, cx| {
1361            let new_checkpoint = new_checkpoint
1362                .await
1363                .context("failed to get new checkpoint")
1364                .log_err();
1365            if let Some(new_checkpoint) = new_checkpoint {
1366                let equal = git_store
1367                    .update(cx, |git, cx| {
1368                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1369                    })?
1370                    .await
1371                    .unwrap_or(true);
1372                this.update(cx, |this, cx| {
1373                    let (ix, message) = this.last_user_message().context("no user message")?;
1374                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1375                    checkpoint.show = !equal;
1376                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1377                    anyhow::Ok(())
1378                })??;
1379            }
1380
1381            Ok(())
1382        })
1383    }
1384
1385    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1386        self.entries
1387            .iter_mut()
1388            .enumerate()
1389            .rev()
1390            .find_map(|(ix, entry)| {
1391                if let AgentThreadEntry::UserMessage(message) = entry {
1392                    Some((ix, message))
1393                } else {
1394                    None
1395                }
1396            })
1397    }
1398
1399    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1400        self.entries.iter().find_map(|entry| {
1401            if let AgentThreadEntry::UserMessage(message) = entry {
1402                if message.id.as_ref() == Some(&id) {
1403                    Some(message)
1404                } else {
1405                    None
1406                }
1407            } else {
1408                None
1409            }
1410        })
1411    }
1412
1413    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1414        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1415            if let AgentThreadEntry::UserMessage(message) = entry {
1416                if message.id.as_ref() == Some(&id) {
1417                    Some((ix, message))
1418                } else {
1419                    None
1420                }
1421            } else {
1422                None
1423            }
1424        })
1425    }
1426
1427    pub fn read_text_file(
1428        &self,
1429        path: PathBuf,
1430        line: Option<u32>,
1431        limit: Option<u32>,
1432        reuse_shared_snapshot: bool,
1433        cx: &mut Context<Self>,
1434    ) -> Task<Result<String>> {
1435        let project = self.project.clone();
1436        let action_log = self.action_log.clone();
1437        cx.spawn(async move |this, cx| {
1438            let load = project.update(cx, |project, cx| {
1439                let path = project
1440                    .project_path_for_absolute_path(&path, cx)
1441                    .context("invalid path")?;
1442                anyhow::Ok(project.open_buffer(path, cx))
1443            });
1444            let buffer = load??.await?;
1445
1446            let snapshot = if reuse_shared_snapshot {
1447                this.read_with(cx, |this, _| {
1448                    this.shared_buffers.get(&buffer.clone()).cloned()
1449                })
1450                .log_err()
1451                .flatten()
1452            } else {
1453                None
1454            };
1455
1456            let snapshot = if let Some(snapshot) = snapshot {
1457                snapshot
1458            } else {
1459                action_log.update(cx, |action_log, cx| {
1460                    action_log.buffer_read(buffer.clone(), cx);
1461                })?;
1462                project.update(cx, |project, cx| {
1463                    let position = buffer
1464                        .read(cx)
1465                        .snapshot()
1466                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1467                    project.set_agent_location(
1468                        Some(AgentLocation {
1469                            buffer: buffer.downgrade(),
1470                            position,
1471                        }),
1472                        cx,
1473                    );
1474                })?;
1475
1476                buffer.update(cx, |buffer, _| buffer.snapshot())?
1477            };
1478
1479            this.update(cx, |this, _| {
1480                let text = snapshot.text();
1481                this.shared_buffers.insert(buffer.clone(), snapshot);
1482                if line.is_none() && limit.is_none() {
1483                    return Ok(text);
1484                }
1485                let limit = limit.unwrap_or(u32::MAX) as usize;
1486                let Some(line) = line else {
1487                    return Ok(text.lines().take(limit).collect::<String>());
1488                };
1489
1490                let count = text.lines().count();
1491                if count < line as usize {
1492                    anyhow::bail!("There are only {} lines", count);
1493                }
1494                Ok(text
1495                    .lines()
1496                    .skip(line as usize + 1)
1497                    .take(limit)
1498                    .collect::<String>())
1499            })?
1500        })
1501    }
1502
1503    pub fn write_text_file(
1504        &self,
1505        path: PathBuf,
1506        content: String,
1507        cx: &mut Context<Self>,
1508    ) -> Task<Result<()>> {
1509        let project = self.project.clone();
1510        let action_log = self.action_log.clone();
1511        cx.spawn(async move |this, cx| {
1512            let load = project.update(cx, |project, cx| {
1513                let path = project
1514                    .project_path_for_absolute_path(&path, cx)
1515                    .context("invalid path")?;
1516                anyhow::Ok(project.open_buffer(path, cx))
1517            });
1518            let buffer = load??.await?;
1519            let snapshot = this.update(cx, |this, cx| {
1520                this.shared_buffers
1521                    .get(&buffer)
1522                    .cloned()
1523                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1524            })?;
1525            let edits = cx
1526                .background_executor()
1527                .spawn(async move {
1528                    let old_text = snapshot.text();
1529                    text_diff(old_text.as_str(), &content)
1530                        .into_iter()
1531                        .map(|(range, replacement)| {
1532                            (
1533                                snapshot.anchor_after(range.start)
1534                                    ..snapshot.anchor_before(range.end),
1535                                replacement,
1536                            )
1537                        })
1538                        .collect::<Vec<_>>()
1539                })
1540                .await;
1541            cx.update(|cx| {
1542                project.update(cx, |project, cx| {
1543                    project.set_agent_location(
1544                        Some(AgentLocation {
1545                            buffer: buffer.downgrade(),
1546                            position: edits
1547                                .last()
1548                                .map(|(range, _)| range.end)
1549                                .unwrap_or(Anchor::MIN),
1550                        }),
1551                        cx,
1552                    );
1553                });
1554
1555                action_log.update(cx, |action_log, cx| {
1556                    action_log.buffer_read(buffer.clone(), cx);
1557                });
1558                buffer.update(cx, |buffer, cx| {
1559                    buffer.edit(edits, None, cx);
1560                });
1561                action_log.update(cx, |action_log, cx| {
1562                    action_log.buffer_edited(buffer.clone(), cx);
1563                });
1564            })?;
1565            project
1566                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1567                .await
1568        })
1569    }
1570
1571    pub fn to_markdown(&self, cx: &App) -> String {
1572        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1573    }
1574
1575    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1576        cx.emit(AcpThreadEvent::ServerExited(status));
1577    }
1578}
1579
1580fn markdown_for_raw_output(
1581    raw_output: &serde_json::Value,
1582    language_registry: &Arc<LanguageRegistry>,
1583    cx: &mut App,
1584) -> Option<Entity<Markdown>> {
1585    match raw_output {
1586        serde_json::Value::Null => None,
1587        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1588            Markdown::new(
1589                value.to_string().into(),
1590                Some(language_registry.clone()),
1591                None,
1592                cx,
1593            )
1594        })),
1595        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1596            Markdown::new(
1597                value.to_string().into(),
1598                Some(language_registry.clone()),
1599                None,
1600                cx,
1601            )
1602        })),
1603        serde_json::Value::String(value) => Some(cx.new(|cx| {
1604            Markdown::new(
1605                value.clone().into(),
1606                Some(language_registry.clone()),
1607                None,
1608                cx,
1609            )
1610        })),
1611        value => Some(cx.new(|cx| {
1612            Markdown::new(
1613                format!("```json\n{}\n```", value).into(),
1614                Some(language_registry.clone()),
1615                None,
1616                cx,
1617            )
1618        })),
1619    }
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624    use super::*;
1625    use anyhow::anyhow;
1626    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1627    use gpui::{AsyncApp, TestAppContext, WeakEntity};
1628    use indoc::indoc;
1629    use project::{FakeFs, Fs};
1630    use rand::Rng as _;
1631    use serde_json::json;
1632    use settings::SettingsStore;
1633    use smol::stream::StreamExt as _;
1634    use std::{
1635        any::Any,
1636        cell::RefCell,
1637        path::Path,
1638        rc::Rc,
1639        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1640        time::Duration,
1641    };
1642    use util::path;
1643
1644    fn init_test(cx: &mut TestAppContext) {
1645        env_logger::try_init().ok();
1646        cx.update(|cx| {
1647            let settings_store = SettingsStore::test(cx);
1648            cx.set_global(settings_store);
1649            Project::init_settings(cx);
1650            language::init(cx);
1651        });
1652    }
1653
1654    #[gpui::test]
1655    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1656        init_test(cx);
1657
1658        let fs = FakeFs::new(cx.executor());
1659        let project = Project::test(fs, [], cx).await;
1660        let connection = Rc::new(FakeAgentConnection::new());
1661        let thread = cx
1662            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1663            .await
1664            .unwrap();
1665
1666        // Test creating a new user message
1667        thread.update(cx, |thread, cx| {
1668            thread.push_user_content_block(
1669                None,
1670                acp::ContentBlock::Text(acp::TextContent {
1671                    annotations: None,
1672                    text: "Hello, ".to_string(),
1673                }),
1674                cx,
1675            );
1676        });
1677
1678        thread.update(cx, |thread, cx| {
1679            assert_eq!(thread.entries.len(), 1);
1680            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1681                assert_eq!(user_msg.id, None);
1682                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1683            } else {
1684                panic!("Expected UserMessage");
1685            }
1686        });
1687
1688        // Test appending to existing user message
1689        let message_1_id = UserMessageId::new();
1690        thread.update(cx, |thread, cx| {
1691            thread.push_user_content_block(
1692                Some(message_1_id.clone()),
1693                acp::ContentBlock::Text(acp::TextContent {
1694                    annotations: None,
1695                    text: "world!".to_string(),
1696                }),
1697                cx,
1698            );
1699        });
1700
1701        thread.update(cx, |thread, cx| {
1702            assert_eq!(thread.entries.len(), 1);
1703            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1704                assert_eq!(user_msg.id, Some(message_1_id));
1705                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1706            } else {
1707                panic!("Expected UserMessage");
1708            }
1709        });
1710
1711        // Test creating new user message after assistant message
1712        thread.update(cx, |thread, cx| {
1713            thread.push_assistant_content_block(
1714                acp::ContentBlock::Text(acp::TextContent {
1715                    annotations: None,
1716                    text: "Assistant response".to_string(),
1717                }),
1718                false,
1719                cx,
1720            );
1721        });
1722
1723        let message_2_id = UserMessageId::new();
1724        thread.update(cx, |thread, cx| {
1725            thread.push_user_content_block(
1726                Some(message_2_id.clone()),
1727                acp::ContentBlock::Text(acp::TextContent {
1728                    annotations: None,
1729                    text: "New user message".to_string(),
1730                }),
1731                cx,
1732            );
1733        });
1734
1735        thread.update(cx, |thread, cx| {
1736            assert_eq!(thread.entries.len(), 3);
1737            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1738                assert_eq!(user_msg.id, Some(message_2_id));
1739                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1740            } else {
1741                panic!("Expected UserMessage at index 2");
1742            }
1743        });
1744    }
1745
1746    #[gpui::test]
1747    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1748        init_test(cx);
1749
1750        let fs = FakeFs::new(cx.executor());
1751        let project = Project::test(fs, [], cx).await;
1752        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1753            |_, thread, mut cx| {
1754                async move {
1755                    thread.update(&mut cx, |thread, cx| {
1756                        thread
1757                            .handle_session_update(
1758                                acp::SessionUpdate::AgentThoughtChunk {
1759                                    content: "Thinking ".into(),
1760                                },
1761                                cx,
1762                            )
1763                            .unwrap();
1764                        thread
1765                            .handle_session_update(
1766                                acp::SessionUpdate::AgentThoughtChunk {
1767                                    content: "hard!".into(),
1768                                },
1769                                cx,
1770                            )
1771                            .unwrap();
1772                    })?;
1773                    Ok(acp::PromptResponse {
1774                        stop_reason: acp::StopReason::EndTurn,
1775                    })
1776                }
1777                .boxed_local()
1778            },
1779        ));
1780
1781        let thread = cx
1782            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1783            .await
1784            .unwrap();
1785
1786        thread
1787            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1788            .await
1789            .unwrap();
1790
1791        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1792        assert_eq!(
1793            output,
1794            indoc! {r#"
1795            ## User
1796
1797            Hello from Zed!
1798
1799            ## Assistant
1800
1801            <thinking>
1802            Thinking hard!
1803            </thinking>
1804
1805            "#}
1806        );
1807    }
1808
1809    #[gpui::test]
1810    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1811        init_test(cx);
1812
1813        let fs = FakeFs::new(cx.executor());
1814        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1815            .await;
1816        let project = Project::test(fs.clone(), [], cx).await;
1817        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1818        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1819        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1820            move |_, thread, mut cx| {
1821                let read_file_tx = read_file_tx.clone();
1822                async move {
1823                    let content = thread
1824                        .update(&mut cx, |thread, cx| {
1825                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1826                        })
1827                        .unwrap()
1828                        .await
1829                        .unwrap();
1830                    assert_eq!(content, "one\ntwo\nthree\n");
1831                    read_file_tx.take().unwrap().send(()).unwrap();
1832                    thread
1833                        .update(&mut cx, |thread, cx| {
1834                            thread.write_text_file(
1835                                path!("/tmp/foo").into(),
1836                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1837                                cx,
1838                            )
1839                        })
1840                        .unwrap()
1841                        .await
1842                        .unwrap();
1843                    Ok(acp::PromptResponse {
1844                        stop_reason: acp::StopReason::EndTurn,
1845                    })
1846                }
1847                .boxed_local()
1848            },
1849        ));
1850
1851        let (worktree, pathbuf) = project
1852            .update(cx, |project, cx| {
1853                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1854            })
1855            .await
1856            .unwrap();
1857        let buffer = project
1858            .update(cx, |project, cx| {
1859                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1860            })
1861            .await
1862            .unwrap();
1863
1864        let thread = cx
1865            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1866            .await
1867            .unwrap();
1868
1869        let request = thread.update(cx, |thread, cx| {
1870            thread.send_raw("Extend the count in /tmp/foo", cx)
1871        });
1872        read_file_rx.await.ok();
1873        buffer.update(cx, |buffer, cx| {
1874            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1875        });
1876        cx.run_until_parked();
1877        assert_eq!(
1878            buffer.read_with(cx, |buffer, _| buffer.text()),
1879            "zero\none\ntwo\nthree\nfour\nfive\n"
1880        );
1881        assert_eq!(
1882            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1883            "zero\none\ntwo\nthree\nfour\nfive\n"
1884        );
1885        request.await.unwrap();
1886    }
1887
1888    #[gpui::test]
1889    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1890        init_test(cx);
1891
1892        let fs = FakeFs::new(cx.executor());
1893        let project = Project::test(fs, [], cx).await;
1894        let id = acp::ToolCallId("test".into());
1895
1896        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1897            let id = id.clone();
1898            move |_, thread, mut cx| {
1899                let id = id.clone();
1900                async move {
1901                    thread
1902                        .update(&mut cx, |thread, cx| {
1903                            thread.handle_session_update(
1904                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1905                                    id: id.clone(),
1906                                    title: "Label".into(),
1907                                    kind: acp::ToolKind::Fetch,
1908                                    status: acp::ToolCallStatus::InProgress,
1909                                    content: vec![],
1910                                    locations: vec![],
1911                                    raw_input: None,
1912                                    raw_output: None,
1913                                }),
1914                                cx,
1915                            )
1916                        })
1917                        .unwrap()
1918                        .unwrap();
1919                    Ok(acp::PromptResponse {
1920                        stop_reason: acp::StopReason::EndTurn,
1921                    })
1922                }
1923                .boxed_local()
1924            }
1925        }));
1926
1927        let thread = cx
1928            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1929            .await
1930            .unwrap();
1931
1932        let request = thread.update(cx, |thread, cx| {
1933            thread.send_raw("Fetch https://example.com", cx)
1934        });
1935
1936        run_until_first_tool_call(&thread, cx).await;
1937
1938        thread.read_with(cx, |thread, _| {
1939            assert!(matches!(
1940                thread.entries[1],
1941                AgentThreadEntry::ToolCall(ToolCall {
1942                    status: ToolCallStatus::Allowed {
1943                        status: acp::ToolCallStatus::InProgress,
1944                        ..
1945                    },
1946                    ..
1947                })
1948            ));
1949        });
1950
1951        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1952
1953        thread.read_with(cx, |thread, _| {
1954            assert!(matches!(
1955                &thread.entries[1],
1956                AgentThreadEntry::ToolCall(ToolCall {
1957                    status: ToolCallStatus::Canceled,
1958                    ..
1959                })
1960            ));
1961        });
1962
1963        thread
1964            .update(cx, |thread, cx| {
1965                thread.handle_session_update(
1966                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1967                        id,
1968                        fields: acp::ToolCallUpdateFields {
1969                            status: Some(acp::ToolCallStatus::Completed),
1970                            ..Default::default()
1971                        },
1972                    }),
1973                    cx,
1974                )
1975            })
1976            .unwrap();
1977
1978        request.await.unwrap();
1979
1980        thread.read_with(cx, |thread, _| {
1981            assert!(matches!(
1982                thread.entries[1],
1983                AgentThreadEntry::ToolCall(ToolCall {
1984                    status: ToolCallStatus::Allowed {
1985                        status: acp::ToolCallStatus::Completed,
1986                        ..
1987                    },
1988                    ..
1989                })
1990            ));
1991        });
1992    }
1993
1994    #[gpui::test]
1995    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
1996        init_test(cx);
1997        let fs = FakeFs::new(cx.background_executor.clone());
1998        fs.insert_tree(path!("/test"), json!({})).await;
1999        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2000
2001        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2002            move |_, thread, mut cx| {
2003                async move {
2004                    thread
2005                        .update(&mut cx, |thread, cx| {
2006                            thread.handle_session_update(
2007                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2008                                    id: acp::ToolCallId("test".into()),
2009                                    title: "Label".into(),
2010                                    kind: acp::ToolKind::Edit,
2011                                    status: acp::ToolCallStatus::Completed,
2012                                    content: vec![acp::ToolCallContent::Diff {
2013                                        diff: acp::Diff {
2014                                            path: "/test/test.txt".into(),
2015                                            old_text: None,
2016                                            new_text: "foo".into(),
2017                                        },
2018                                    }],
2019                                    locations: vec![],
2020                                    raw_input: None,
2021                                    raw_output: None,
2022                                }),
2023                                cx,
2024                            )
2025                        })
2026                        .unwrap()
2027                        .unwrap();
2028                    Ok(acp::PromptResponse {
2029                        stop_reason: acp::StopReason::EndTurn,
2030                    })
2031                }
2032                .boxed_local()
2033            }
2034        }));
2035
2036        let thread = cx
2037            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2038            .await
2039            .unwrap();
2040
2041        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2042            .await
2043            .unwrap();
2044
2045        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2046    }
2047
2048    #[gpui::test(iterations = 10)]
2049    async fn test_checkpoints(cx: &mut TestAppContext) {
2050        init_test(cx);
2051        let fs = FakeFs::new(cx.background_executor.clone());
2052        fs.insert_tree(
2053            path!("/test"),
2054            json!({
2055                ".git": {}
2056            }),
2057        )
2058        .await;
2059        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2060
2061        let simulate_changes = Arc::new(AtomicBool::new(true));
2062        let next_filename = Arc::new(AtomicUsize::new(0));
2063        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2064            let simulate_changes = simulate_changes.clone();
2065            let next_filename = next_filename.clone();
2066            let fs = fs.clone();
2067            move |request, thread, mut cx| {
2068                let fs = fs.clone();
2069                let simulate_changes = simulate_changes.clone();
2070                let next_filename = next_filename.clone();
2071                async move {
2072                    if simulate_changes.load(SeqCst) {
2073                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2074                        fs.write(Path::new(&filename), b"").await?;
2075                    }
2076
2077                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2078                        panic!("expected text content block");
2079                    };
2080                    thread.update(&mut cx, |thread, cx| {
2081                        thread
2082                            .handle_session_update(
2083                                acp::SessionUpdate::AgentMessageChunk {
2084                                    content: content.text.to_uppercase().into(),
2085                                },
2086                                cx,
2087                            )
2088                            .unwrap();
2089                    })?;
2090                    Ok(acp::PromptResponse {
2091                        stop_reason: acp::StopReason::EndTurn,
2092                    })
2093                }
2094                .boxed_local()
2095            }
2096        }));
2097        let thread = cx
2098            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2099            .await
2100            .unwrap();
2101
2102        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2103            .await
2104            .unwrap();
2105        thread.read_with(cx, |thread, cx| {
2106            assert_eq!(
2107                thread.to_markdown(cx),
2108                indoc! {"
2109                    ## User (checkpoint)
2110
2111                    Lorem
2112
2113                    ## Assistant
2114
2115                    LOREM
2116
2117                "}
2118            );
2119        });
2120        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2121
2122        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2123            .await
2124            .unwrap();
2125        thread.read_with(cx, |thread, cx| {
2126            assert_eq!(
2127                thread.to_markdown(cx),
2128                indoc! {"
2129                    ## User (checkpoint)
2130
2131                    Lorem
2132
2133                    ## Assistant
2134
2135                    LOREM
2136
2137                    ## User (checkpoint)
2138
2139                    ipsum
2140
2141                    ## Assistant
2142
2143                    IPSUM
2144
2145                "}
2146            );
2147        });
2148        assert_eq!(
2149            fs.files(),
2150            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2151        );
2152
2153        // Checkpoint isn't stored when there are no changes.
2154        simulate_changes.store(false, SeqCst);
2155        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2156            .await
2157            .unwrap();
2158        thread.read_with(cx, |thread, cx| {
2159            assert_eq!(
2160                thread.to_markdown(cx),
2161                indoc! {"
2162                    ## User (checkpoint)
2163
2164                    Lorem
2165
2166                    ## Assistant
2167
2168                    LOREM
2169
2170                    ## User (checkpoint)
2171
2172                    ipsum
2173
2174                    ## Assistant
2175
2176                    IPSUM
2177
2178                    ## User
2179
2180                    dolor
2181
2182                    ## Assistant
2183
2184                    DOLOR
2185
2186                "}
2187            );
2188        });
2189        assert_eq!(
2190            fs.files(),
2191            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2192        );
2193
2194        // Rewinding the conversation truncates the history and restores the checkpoint.
2195        thread
2196            .update(cx, |thread, cx| {
2197                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2198                    panic!("unexpected entries {:?}", thread.entries)
2199                };
2200                thread.rewind(message.id.clone().unwrap(), cx)
2201            })
2202            .await
2203            .unwrap();
2204        thread.read_with(cx, |thread, cx| {
2205            assert_eq!(
2206                thread.to_markdown(cx),
2207                indoc! {"
2208                    ## User (checkpoint)
2209
2210                    Lorem
2211
2212                    ## Assistant
2213
2214                    LOREM
2215
2216                "}
2217            );
2218        });
2219        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2220    }
2221
2222    async fn run_until_first_tool_call(
2223        thread: &Entity<AcpThread>,
2224        cx: &mut TestAppContext,
2225    ) -> usize {
2226        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2227
2228        let subscription = cx.update(|cx| {
2229            cx.subscribe(thread, move |thread, _, cx| {
2230                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2231                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2232                        return tx.try_send(ix).unwrap();
2233                    }
2234                }
2235            })
2236        });
2237
2238        select! {
2239            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2240                panic!("Timeout waiting for tool call")
2241            }
2242            ix = rx.next().fuse() => {
2243                drop(subscription);
2244                ix.unwrap()
2245            }
2246        }
2247    }
2248
2249    #[derive(Clone, Default)]
2250    struct FakeAgentConnection {
2251        auth_methods: Vec<acp::AuthMethod>,
2252        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2253        on_user_message: Option<
2254            Rc<
2255                dyn Fn(
2256                        acp::PromptRequest,
2257                        WeakEntity<AcpThread>,
2258                        AsyncApp,
2259                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2260                    + 'static,
2261            >,
2262        >,
2263    }
2264
2265    impl FakeAgentConnection {
2266        fn new() -> Self {
2267            Self {
2268                auth_methods: Vec::new(),
2269                on_user_message: None,
2270                sessions: Arc::default(),
2271            }
2272        }
2273
2274        #[expect(unused)]
2275        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2276            self.auth_methods = auth_methods;
2277            self
2278        }
2279
2280        fn on_user_message(
2281            mut self,
2282            handler: impl Fn(
2283                acp::PromptRequest,
2284                WeakEntity<AcpThread>,
2285                AsyncApp,
2286            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2287            + 'static,
2288        ) -> Self {
2289            self.on_user_message.replace(Rc::new(handler));
2290            self
2291        }
2292    }
2293
2294    impl AgentConnection for FakeAgentConnection {
2295        fn auth_methods(&self) -> &[acp::AuthMethod] {
2296            &self.auth_methods
2297        }
2298
2299        fn new_thread(
2300            self: Rc<Self>,
2301            project: Entity<Project>,
2302            _cwd: &Path,
2303            cx: &mut gpui::App,
2304        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2305            let session_id = acp::SessionId(
2306                rand::thread_rng()
2307                    .sample_iter(&rand::distributions::Alphanumeric)
2308                    .take(7)
2309                    .map(char::from)
2310                    .collect::<String>()
2311                    .into(),
2312            );
2313            let thread =
2314                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2315            self.sessions.lock().insert(session_id, thread.downgrade());
2316            Task::ready(Ok(thread))
2317        }
2318
2319        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2320            if self.auth_methods().iter().any(|m| m.id == method) {
2321                Task::ready(Ok(()))
2322            } else {
2323                Task::ready(Err(anyhow!("Invalid Auth Method")))
2324            }
2325        }
2326
2327        fn prompt(
2328            &self,
2329            _id: Option<UserMessageId>,
2330            params: acp::PromptRequest,
2331            cx: &mut App,
2332        ) -> Task<gpui::Result<acp::PromptResponse>> {
2333            let sessions = self.sessions.lock();
2334            let thread = sessions.get(&params.session_id).unwrap();
2335            if let Some(handler) = &self.on_user_message {
2336                let handler = handler.clone();
2337                let thread = thread.clone();
2338                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2339            } else {
2340                Task::ready(Ok(acp::PromptResponse {
2341                    stop_reason: acp::StopReason::EndTurn,
2342                }))
2343            }
2344        }
2345
2346        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2347            let sessions = self.sessions.lock();
2348            let thread = sessions.get(&session_id).unwrap().clone();
2349
2350            cx.spawn(async move |cx| {
2351                thread
2352                    .update(cx, |thread, cx| thread.cancel(cx))
2353                    .unwrap()
2354                    .await
2355            })
2356            .detach();
2357        }
2358
2359        fn session_editor(
2360            &self,
2361            session_id: &acp::SessionId,
2362            _cx: &mut App,
2363        ) -> Option<Rc<dyn AgentSessionEditor>> {
2364            Some(Rc::new(FakeAgentSessionEditor {
2365                _session_id: session_id.clone(),
2366            }))
2367        }
2368
2369        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2370            self
2371        }
2372    }
2373
2374    struct FakeAgentSessionEditor {
2375        _session_id: acp::SessionId,
2376    }
2377
2378    impl AgentSessionEditor for FakeAgentSessionEditor {
2379        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2380            Task::ready(Ok(()))
2381        }
2382    }
2383}