1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6pub use connection::*;
7pub use diff::*;
8pub use mention::*;
9use serde::{Deserialize, Serialize};
10pub use terminal::*;
11
12use action_log::ActionLog;
13use agent_client_protocol as acp;
14use anyhow::{Context as _, Result, anyhow};
15use editor::Bias;
16use futures::{FutureExt, channel::oneshot, future::BoxFuture};
17use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
18use itertools::Itertools;
19use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
20use markdown::Markdown;
21use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use ui::App;
31use util::ResultExt;
32
33#[derive(Debug)]
34pub struct UserMessage {
35 pub id: Option<UserMessageId>,
36 pub content: ContentBlock,
37 pub chunks: Vec<acp::ContentBlock>,
38 pub checkpoint: Option<Checkpoint>,
39}
40
41#[derive(Debug)]
42pub struct Checkpoint {
43 git_checkpoint: GitStoreCheckpoint,
44 pub show: bool,
45}
46
47impl UserMessage {
48 fn to_markdown(&self, cx: &App) -> String {
49 let mut markdown = String::new();
50 if self
51 .checkpoint
52 .as_ref()
53 .is_some_and(|checkpoint| checkpoint.show)
54 {
55 writeln!(markdown, "## User (checkpoint)").unwrap();
56 } else {
57 writeln!(markdown, "## User").unwrap();
58 }
59 writeln!(markdown).unwrap();
60 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
61 writeln!(markdown).unwrap();
62 markdown
63 }
64}
65
66#[derive(Debug, PartialEq)]
67pub struct AssistantMessage {
68 pub chunks: Vec<AssistantMessageChunk>,
69}
70
71impl AssistantMessage {
72 pub fn to_markdown(&self, cx: &App) -> String {
73 format!(
74 "## Assistant\n\n{}\n\n",
75 self.chunks
76 .iter()
77 .map(|chunk| chunk.to_markdown(cx))
78 .join("\n\n")
79 )
80 }
81}
82
83#[derive(Debug, PartialEq)]
84pub enum AssistantMessageChunk {
85 Message { block: ContentBlock },
86 Thought { block: ContentBlock },
87}
88
89impl AssistantMessageChunk {
90 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
91 Self::Message {
92 block: ContentBlock::new(chunk.into(), language_registry, cx),
93 }
94 }
95
96 fn to_markdown(&self, cx: &App) -> String {
97 match self {
98 Self::Message { block } => block.to_markdown(cx).to_string(),
99 Self::Thought { block } => {
100 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
101 }
102 }
103 }
104}
105
106#[derive(Debug)]
107pub enum AgentThreadEntry {
108 UserMessage(UserMessage),
109 AssistantMessage(AssistantMessage),
110 ToolCall(ToolCall),
111}
112
113impl AgentThreadEntry {
114 pub fn to_markdown(&self, cx: &App) -> String {
115 match self {
116 Self::UserMessage(message) => message.to_markdown(cx),
117 Self::AssistantMessage(message) => message.to_markdown(cx),
118 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
119 }
120 }
121
122 pub fn user_message(&self) -> Option<&UserMessage> {
123 if let AgentThreadEntry::UserMessage(message) = self {
124 Some(message)
125 } else {
126 None
127 }
128 }
129
130 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
131 if let AgentThreadEntry::ToolCall(call) = self {
132 itertools::Either::Left(call.diffs())
133 } else {
134 itertools::Either::Right(std::iter::empty())
135 }
136 }
137
138 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
139 if let AgentThreadEntry::ToolCall(call) = self {
140 itertools::Either::Left(call.terminals())
141 } else {
142 itertools::Either::Right(std::iter::empty())
143 }
144 }
145
146 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
147 if let AgentThreadEntry::ToolCall(ToolCall {
148 locations,
149 resolved_locations,
150 ..
151 }) = self
152 {
153 Some((
154 locations.get(ix)?.clone(),
155 resolved_locations.get(ix)?.clone()?,
156 ))
157 } else {
158 None
159 }
160 }
161}
162
163#[derive(Debug)]
164pub struct ToolCall {
165 pub id: acp::ToolCallId,
166 pub label: Entity<Markdown>,
167 pub kind: acp::ToolKind,
168 pub content: Vec<ToolCallContent>,
169 pub status: ToolCallStatus,
170 pub locations: Vec<acp::ToolCallLocation>,
171 pub resolved_locations: Vec<Option<AgentLocation>>,
172 pub raw_input: Option<serde_json::Value>,
173 pub raw_output: Option<serde_json::Value>,
174}
175
176impl ToolCall {
177 fn from_acp(
178 tool_call: acp::ToolCall,
179 status: ToolCallStatus,
180 language_registry: Arc<LanguageRegistry>,
181 cx: &mut App,
182 ) -> Self {
183 Self {
184 id: tool_call.id,
185 label: cx.new(|cx| {
186 Markdown::new(
187 tool_call.title.into(),
188 Some(language_registry.clone()),
189 None,
190 cx,
191 )
192 }),
193 kind: tool_call.kind,
194 content: tool_call
195 .content
196 .into_iter()
197 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
198 .collect(),
199 locations: tool_call.locations,
200 resolved_locations: Vec::default(),
201 status,
202 raw_input: tool_call.raw_input,
203 raw_output: tool_call.raw_output,
204 }
205 }
206
207 fn update_fields(
208 &mut self,
209 fields: acp::ToolCallUpdateFields,
210 language_registry: Arc<LanguageRegistry>,
211 cx: &mut App,
212 ) {
213 let acp::ToolCallUpdateFields {
214 kind,
215 status,
216 title,
217 content,
218 locations,
219 raw_input,
220 raw_output,
221 } = fields;
222
223 if let Some(kind) = kind {
224 self.kind = kind;
225 }
226
227 if let Some(status) = status {
228 self.status = status.into();
229 }
230
231 if let Some(title) = title {
232 self.label.update(cx, |label, cx| {
233 label.replace(title, cx);
234 });
235 }
236
237 if let Some(content) = content {
238 self.content = content
239 .into_iter()
240 .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
241 .collect();
242 }
243
244 if let Some(locations) = locations {
245 self.locations = locations;
246 }
247
248 if let Some(raw_input) = raw_input {
249 self.raw_input = Some(raw_input);
250 }
251
252 if let Some(raw_output) = raw_output {
253 if self.content.is_empty()
254 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
255 {
256 self.content
257 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
258 markdown,
259 }));
260 }
261 self.raw_output = Some(raw_output);
262 }
263 }
264
265 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
266 self.content.iter().filter_map(|content| match content {
267 ToolCallContent::Diff(diff) => Some(diff),
268 ToolCallContent::ContentBlock(_) => None,
269 ToolCallContent::Terminal(_) => None,
270 })
271 }
272
273 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
274 self.content.iter().filter_map(|content| match content {
275 ToolCallContent::Terminal(terminal) => Some(terminal),
276 ToolCallContent::ContentBlock(_) => None,
277 ToolCallContent::Diff(_) => None,
278 })
279 }
280
281 fn to_markdown(&self, cx: &App) -> String {
282 let mut markdown = format!(
283 "**Tool Call: {}**\nStatus: {}\n\n",
284 self.label.read(cx).source(),
285 self.status
286 );
287 for content in &self.content {
288 markdown.push_str(content.to_markdown(cx).as_str());
289 markdown.push_str("\n\n");
290 }
291 markdown
292 }
293
294 async fn resolve_location(
295 location: acp::ToolCallLocation,
296 project: WeakEntity<Project>,
297 cx: &mut AsyncApp,
298 ) -> Option<AgentLocation> {
299 let buffer = project
300 .update(cx, |project, cx| {
301 if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
302 Some(project.open_buffer(path, cx))
303 } else {
304 None
305 }
306 })
307 .ok()??;
308 let buffer = buffer.await.log_err()?;
309 let position = buffer
310 .update(cx, |buffer, _| {
311 if let Some(row) = location.line {
312 let snapshot = buffer.snapshot();
313 let column = snapshot.indent_size_for_line(row).len;
314 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
315 snapshot.anchor_before(point)
316 } else {
317 Anchor::MIN
318 }
319 })
320 .ok()?;
321
322 Some(AgentLocation {
323 buffer: buffer.downgrade(),
324 position,
325 })
326 }
327
328 fn resolve_locations(
329 &self,
330 project: Entity<Project>,
331 cx: &mut App,
332 ) -> Task<Vec<Option<AgentLocation>>> {
333 let locations = self.locations.clone();
334 project.update(cx, |_, cx| {
335 cx.spawn(async move |project, cx| {
336 let mut new_locations = Vec::new();
337 for location in locations {
338 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
339 }
340 new_locations
341 })
342 })
343 }
344}
345
346#[derive(Debug)]
347pub enum ToolCallStatus {
348 /// The tool call hasn't started running yet, but we start showing it to
349 /// the user.
350 Pending,
351 /// The tool call is waiting for confirmation from the user.
352 WaitingForConfirmation {
353 options: Vec<acp::PermissionOption>,
354 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
355 },
356 /// The tool call is currently running.
357 InProgress,
358 /// The tool call completed successfully.
359 Completed,
360 /// The tool call failed.
361 Failed,
362 /// The user rejected the tool call.
363 Rejected,
364 /// The user canceled generation so the tool call was canceled.
365 Canceled,
366}
367
368impl From<acp::ToolCallStatus> for ToolCallStatus {
369 fn from(status: acp::ToolCallStatus) -> Self {
370 match status {
371 acp::ToolCallStatus::Pending => Self::Pending,
372 acp::ToolCallStatus::InProgress => Self::InProgress,
373 acp::ToolCallStatus::Completed => Self::Completed,
374 acp::ToolCallStatus::Failed => Self::Failed,
375 }
376 }
377}
378
379impl Display for ToolCallStatus {
380 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
381 write!(
382 f,
383 "{}",
384 match self {
385 ToolCallStatus::Pending => "Pending",
386 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
387 ToolCallStatus::InProgress => "In Progress",
388 ToolCallStatus::Completed => "Completed",
389 ToolCallStatus::Failed => "Failed",
390 ToolCallStatus::Rejected => "Rejected",
391 ToolCallStatus::Canceled => "Canceled",
392 }
393 )
394 }
395}
396
397#[derive(Debug, PartialEq, Clone)]
398pub enum ContentBlock {
399 Empty,
400 Markdown { markdown: Entity<Markdown> },
401 ResourceLink { resource_link: acp::ResourceLink },
402}
403
404impl ContentBlock {
405 pub fn new(
406 block: acp::ContentBlock,
407 language_registry: &Arc<LanguageRegistry>,
408 cx: &mut App,
409 ) -> Self {
410 let mut this = Self::Empty;
411 this.append(block, language_registry, cx);
412 this
413 }
414
415 pub fn new_combined(
416 blocks: impl IntoIterator<Item = acp::ContentBlock>,
417 language_registry: Arc<LanguageRegistry>,
418 cx: &mut App,
419 ) -> Self {
420 let mut this = Self::Empty;
421 for block in blocks {
422 this.append(block, &language_registry, cx);
423 }
424 this
425 }
426
427 pub fn append(
428 &mut self,
429 block: acp::ContentBlock,
430 language_registry: &Arc<LanguageRegistry>,
431 cx: &mut App,
432 ) {
433 if matches!(self, ContentBlock::Empty)
434 && let acp::ContentBlock::ResourceLink(resource_link) = block
435 {
436 *self = ContentBlock::ResourceLink { resource_link };
437 return;
438 }
439
440 let new_content = self.block_string_contents(block);
441
442 match self {
443 ContentBlock::Empty => {
444 *self = Self::create_markdown_block(new_content, language_registry, cx);
445 }
446 ContentBlock::Markdown { markdown } => {
447 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
448 }
449 ContentBlock::ResourceLink { resource_link } => {
450 let existing_content = Self::resource_link_md(&resource_link.uri);
451 let combined = format!("{}\n{}", existing_content, new_content);
452
453 *self = Self::create_markdown_block(combined, language_registry, cx);
454 }
455 }
456 }
457
458 fn create_markdown_block(
459 content: String,
460 language_registry: &Arc<LanguageRegistry>,
461 cx: &mut App,
462 ) -> ContentBlock {
463 ContentBlock::Markdown {
464 markdown: cx
465 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
466 }
467 }
468
469 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
470 match block {
471 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
472 acp::ContentBlock::ResourceLink(resource_link) => {
473 Self::resource_link_md(&resource_link.uri)
474 }
475 acp::ContentBlock::Resource(acp::EmbeddedResource {
476 resource:
477 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
478 uri,
479 ..
480 }),
481 ..
482 }) => Self::resource_link_md(&uri),
483 acp::ContentBlock::Image(image) => Self::image_md(&image),
484 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
485 }
486 }
487
488 fn resource_link_md(uri: &str) -> String {
489 if let Some(uri) = MentionUri::parse(uri).log_err() {
490 uri.as_link().to_string()
491 } else {
492 uri.to_string()
493 }
494 }
495
496 fn image_md(_image: &acp::ImageContent) -> String {
497 "`Image`".into()
498 }
499
500 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
501 match self {
502 ContentBlock::Empty => "",
503 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
504 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
505 }
506 }
507
508 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
509 match self {
510 ContentBlock::Empty => None,
511 ContentBlock::Markdown { markdown } => Some(markdown),
512 ContentBlock::ResourceLink { .. } => None,
513 }
514 }
515
516 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
517 match self {
518 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
519 _ => None,
520 }
521 }
522}
523
524#[derive(Debug)]
525pub enum ToolCallContent {
526 ContentBlock(ContentBlock),
527 Diff(Entity<Diff>),
528 Terminal(Entity<Terminal>),
529}
530
531impl ToolCallContent {
532 pub fn from_acp(
533 content: acp::ToolCallContent,
534 language_registry: Arc<LanguageRegistry>,
535 cx: &mut App,
536 ) -> Self {
537 match content {
538 acp::ToolCallContent::Content { content } => {
539 Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
540 }
541 acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
542 Diff::finalized(
543 diff.path,
544 diff.old_text,
545 diff.new_text,
546 language_registry,
547 cx,
548 )
549 })),
550 }
551 }
552
553 pub fn to_markdown(&self, cx: &App) -> String {
554 match self {
555 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
556 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
557 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
558 }
559 }
560}
561
562#[derive(Debug, PartialEq)]
563pub enum ToolCallUpdate {
564 UpdateFields(acp::ToolCallUpdate),
565 UpdateDiff(ToolCallUpdateDiff),
566 UpdateTerminal(ToolCallUpdateTerminal),
567}
568
569impl ToolCallUpdate {
570 fn id(&self) -> &acp::ToolCallId {
571 match self {
572 Self::UpdateFields(update) => &update.id,
573 Self::UpdateDiff(diff) => &diff.id,
574 Self::UpdateTerminal(terminal) => &terminal.id,
575 }
576 }
577}
578
579impl From<acp::ToolCallUpdate> for ToolCallUpdate {
580 fn from(update: acp::ToolCallUpdate) -> Self {
581 Self::UpdateFields(update)
582 }
583}
584
585impl From<ToolCallUpdateDiff> for ToolCallUpdate {
586 fn from(diff: ToolCallUpdateDiff) -> Self {
587 Self::UpdateDiff(diff)
588 }
589}
590
591#[derive(Debug, PartialEq)]
592pub struct ToolCallUpdateDiff {
593 pub id: acp::ToolCallId,
594 pub diff: Entity<Diff>,
595}
596
597impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
598 fn from(terminal: ToolCallUpdateTerminal) -> Self {
599 Self::UpdateTerminal(terminal)
600 }
601}
602
603#[derive(Debug, PartialEq)]
604pub struct ToolCallUpdateTerminal {
605 pub id: acp::ToolCallId,
606 pub terminal: Entity<Terminal>,
607}
608
609#[derive(Debug, Default)]
610pub struct Plan {
611 pub entries: Vec<PlanEntry>,
612}
613
614#[derive(Debug)]
615pub struct PlanStats<'a> {
616 pub in_progress_entry: Option<&'a PlanEntry>,
617 pub pending: u32,
618 pub completed: u32,
619}
620
621impl Plan {
622 pub fn is_empty(&self) -> bool {
623 self.entries.is_empty()
624 }
625
626 pub fn stats(&self) -> PlanStats<'_> {
627 let mut stats = PlanStats {
628 in_progress_entry: None,
629 pending: 0,
630 completed: 0,
631 };
632
633 for entry in &self.entries {
634 match &entry.status {
635 acp::PlanEntryStatus::Pending => {
636 stats.pending += 1;
637 }
638 acp::PlanEntryStatus::InProgress => {
639 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
640 }
641 acp::PlanEntryStatus::Completed => {
642 stats.completed += 1;
643 }
644 }
645 }
646
647 stats
648 }
649}
650
651#[derive(Debug)]
652pub struct PlanEntry {
653 pub content: Entity<Markdown>,
654 pub priority: acp::PlanEntryPriority,
655 pub status: acp::PlanEntryStatus,
656}
657
658impl PlanEntry {
659 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
660 Self {
661 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
662 priority: entry.priority,
663 status: entry.status,
664 }
665 }
666}
667
668#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
669pub struct TokenUsage {
670 pub max_tokens: u64,
671 pub used_tokens: u64,
672}
673
674#[derive(Debug, Clone)]
675pub struct RetryStatus {
676 pub last_error: SharedString,
677 pub attempt: usize,
678 pub max_attempts: usize,
679 pub started_at: Instant,
680 pub duration: Duration,
681}
682
683pub struct AcpThread {
684 title: SharedString,
685 entries: Vec<AgentThreadEntry>,
686 plan: Plan,
687 project: Entity<Project>,
688 action_log: Entity<ActionLog>,
689 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
690 send_task: Option<Task<()>>,
691 connection: Rc<dyn AgentConnection>,
692 session_id: acp::SessionId,
693 token_usage: Option<TokenUsage>,
694}
695
696#[derive(Debug)]
697pub enum AcpThreadEvent {
698 NewEntry,
699 TitleUpdated,
700 TokenUsageUpdated,
701 EntryUpdated(usize),
702 EntriesRemoved(Range<usize>),
703 ToolAuthorizationRequired,
704 Retry(RetryStatus),
705 Stopped,
706 Error,
707 ServerExited(ExitStatus),
708}
709
710impl EventEmitter<AcpThreadEvent> for AcpThread {}
711
712#[derive(PartialEq, Eq)]
713pub enum ThreadStatus {
714 Idle,
715 WaitingForToolConfirmation,
716 Generating,
717}
718
719#[derive(Debug, Clone)]
720pub enum LoadError {
721 Unsupported {
722 error_message: SharedString,
723 upgrade_message: SharedString,
724 upgrade_command: String,
725 },
726 Exited(i32),
727 Other(SharedString),
728}
729
730impl Display for LoadError {
731 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
732 match self {
733 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
734 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
735 LoadError::Other(msg) => write!(f, "{}", msg),
736 }
737 }
738}
739
740impl Error for LoadError {}
741
742impl AcpThread {
743 pub fn new(
744 title: impl Into<SharedString>,
745 connection: Rc<dyn AgentConnection>,
746 project: Entity<Project>,
747 action_log: Entity<ActionLog>,
748 session_id: acp::SessionId,
749 ) -> Self {
750 Self {
751 action_log,
752 shared_buffers: Default::default(),
753 entries: Default::default(),
754 plan: Default::default(),
755 title: title.into(),
756 project,
757 send_task: None,
758 connection,
759 session_id,
760 token_usage: None,
761 }
762 }
763
764 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
765 &self.connection
766 }
767
768 pub fn action_log(&self) -> &Entity<ActionLog> {
769 &self.action_log
770 }
771
772 pub fn project(&self) -> &Entity<Project> {
773 &self.project
774 }
775
776 pub fn title(&self) -> SharedString {
777 self.title.clone()
778 }
779
780 pub fn entries(&self) -> &[AgentThreadEntry] {
781 &self.entries
782 }
783
784 pub fn session_id(&self) -> &acp::SessionId {
785 &self.session_id
786 }
787
788 pub fn status(&self) -> ThreadStatus {
789 if self.send_task.is_some() {
790 if self.waiting_for_tool_confirmation() {
791 ThreadStatus::WaitingForToolConfirmation
792 } else {
793 ThreadStatus::Generating
794 }
795 } else {
796 ThreadStatus::Idle
797 }
798 }
799
800 pub fn token_usage(&self) -> Option<&TokenUsage> {
801 self.token_usage.as_ref()
802 }
803
804 pub fn has_pending_edit_tool_calls(&self) -> bool {
805 for entry in self.entries.iter().rev() {
806 match entry {
807 AgentThreadEntry::UserMessage(_) => return false,
808 AgentThreadEntry::ToolCall(
809 call @ ToolCall {
810 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
811 ..
812 },
813 ) if call.diffs().next().is_some() => {
814 return true;
815 }
816 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
817 }
818 }
819
820 false
821 }
822
823 pub fn used_tools_since_last_user_message(&self) -> bool {
824 for entry in self.entries.iter().rev() {
825 match entry {
826 AgentThreadEntry::UserMessage(..) => return false,
827 AgentThreadEntry::AssistantMessage(..) => continue,
828 AgentThreadEntry::ToolCall(..) => return true,
829 }
830 }
831
832 false
833 }
834
835 pub fn handle_session_update(
836 &mut self,
837 update: acp::SessionUpdate,
838 cx: &mut Context<Self>,
839 ) -> Result<(), acp::Error> {
840 match update {
841 acp::SessionUpdate::UserMessageChunk { content } => {
842 self.push_user_content_block(None, content, cx);
843 }
844 acp::SessionUpdate::AgentMessageChunk { content } => {
845 self.push_assistant_content_block(content, false, cx);
846 }
847 acp::SessionUpdate::AgentThoughtChunk { content } => {
848 self.push_assistant_content_block(content, true, cx);
849 }
850 acp::SessionUpdate::ToolCall(tool_call) => {
851 self.upsert_tool_call(tool_call, cx)?;
852 }
853 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
854 self.update_tool_call(tool_call_update, cx)?;
855 }
856 acp::SessionUpdate::Plan(plan) => {
857 self.update_plan(plan, cx);
858 }
859 }
860 Ok(())
861 }
862
863 pub fn push_user_content_block(
864 &mut self,
865 message_id: Option<UserMessageId>,
866 chunk: acp::ContentBlock,
867 cx: &mut Context<Self>,
868 ) {
869 let language_registry = self.project.read(cx).languages().clone();
870 let entries_len = self.entries.len();
871
872 if let Some(last_entry) = self.entries.last_mut()
873 && let AgentThreadEntry::UserMessage(UserMessage {
874 id,
875 content,
876 chunks,
877 ..
878 }) = last_entry
879 {
880 *id = message_id.or(id.take());
881 content.append(chunk.clone(), &language_registry, cx);
882 chunks.push(chunk);
883 let idx = entries_len - 1;
884 cx.emit(AcpThreadEvent::EntryUpdated(idx));
885 } else {
886 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
887 self.push_entry(
888 AgentThreadEntry::UserMessage(UserMessage {
889 id: message_id,
890 content,
891 chunks: vec![chunk],
892 checkpoint: None,
893 }),
894 cx,
895 );
896 }
897 }
898
899 pub fn push_assistant_content_block(
900 &mut self,
901 chunk: acp::ContentBlock,
902 is_thought: bool,
903 cx: &mut Context<Self>,
904 ) {
905 let language_registry = self.project.read(cx).languages().clone();
906 let entries_len = self.entries.len();
907 if let Some(last_entry) = self.entries.last_mut()
908 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
909 {
910 let idx = entries_len - 1;
911 cx.emit(AcpThreadEvent::EntryUpdated(idx));
912 match (chunks.last_mut(), is_thought) {
913 (Some(AssistantMessageChunk::Message { block }), false)
914 | (Some(AssistantMessageChunk::Thought { block }), true) => {
915 block.append(chunk, &language_registry, cx)
916 }
917 _ => {
918 let block = ContentBlock::new(chunk, &language_registry, cx);
919 if is_thought {
920 chunks.push(AssistantMessageChunk::Thought { block })
921 } else {
922 chunks.push(AssistantMessageChunk::Message { block })
923 }
924 }
925 }
926 } else {
927 let block = ContentBlock::new(chunk, &language_registry, cx);
928 let chunk = if is_thought {
929 AssistantMessageChunk::Thought { block }
930 } else {
931 AssistantMessageChunk::Message { block }
932 };
933
934 self.push_entry(
935 AgentThreadEntry::AssistantMessage(AssistantMessage {
936 chunks: vec![chunk],
937 }),
938 cx,
939 );
940 }
941 }
942
943 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
944 self.entries.push(entry);
945 cx.emit(AcpThreadEvent::NewEntry);
946 }
947
948 pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
949 self.title = title;
950 cx.emit(AcpThreadEvent::TitleUpdated);
951 Ok(())
952 }
953
954 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
955 self.token_usage = usage;
956 cx.emit(AcpThreadEvent::TokenUsageUpdated);
957 }
958
959 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
960 cx.emit(AcpThreadEvent::Retry(status));
961 }
962
963 pub fn update_tool_call(
964 &mut self,
965 update: impl Into<ToolCallUpdate>,
966 cx: &mut Context<Self>,
967 ) -> Result<()> {
968 let update = update.into();
969 let languages = self.project.read(cx).languages().clone();
970
971 let (ix, current_call) = self
972 .tool_call_mut(update.id())
973 .context("Tool call not found")?;
974 match update {
975 ToolCallUpdate::UpdateFields(update) => {
976 let location_updated = update.fields.locations.is_some();
977 current_call.update_fields(update.fields, languages, cx);
978 if location_updated {
979 self.resolve_locations(update.id.clone(), cx);
980 }
981 }
982 ToolCallUpdate::UpdateDiff(update) => {
983 current_call.content.clear();
984 current_call
985 .content
986 .push(ToolCallContent::Diff(update.diff));
987 }
988 ToolCallUpdate::UpdateTerminal(update) => {
989 current_call.content.clear();
990 current_call
991 .content
992 .push(ToolCallContent::Terminal(update.terminal));
993 }
994 }
995
996 cx.emit(AcpThreadEvent::EntryUpdated(ix));
997
998 Ok(())
999 }
1000
1001 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1002 pub fn upsert_tool_call(
1003 &mut self,
1004 tool_call: acp::ToolCall,
1005 cx: &mut Context<Self>,
1006 ) -> Result<(), acp::Error> {
1007 let status = tool_call.status.into();
1008 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1009 }
1010
1011 /// Fails if id does not match an existing entry.
1012 pub fn upsert_tool_call_inner(
1013 &mut self,
1014 tool_call_update: acp::ToolCallUpdate,
1015 status: ToolCallStatus,
1016 cx: &mut Context<Self>,
1017 ) -> Result<(), acp::Error> {
1018 let language_registry = self.project.read(cx).languages().clone();
1019 let id = tool_call_update.id.clone();
1020
1021 if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1022 current_call.update_fields(tool_call_update.fields, language_registry, cx);
1023 current_call.status = status;
1024
1025 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1026 } else {
1027 let call =
1028 ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1029 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1030 };
1031
1032 self.resolve_locations(id, cx);
1033 Ok(())
1034 }
1035
1036 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1037 // The tool call we are looking for is typically the last one, or very close to the end.
1038 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1039 self.entries
1040 .iter_mut()
1041 .enumerate()
1042 .rev()
1043 .find_map(|(index, tool_call)| {
1044 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1045 && &tool_call.id == id
1046 {
1047 Some((index, tool_call))
1048 } else {
1049 None
1050 }
1051 })
1052 }
1053
1054 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1055 let project = self.project.clone();
1056 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1057 return;
1058 };
1059 let task = tool_call.resolve_locations(project, cx);
1060 cx.spawn(async move |this, cx| {
1061 let resolved_locations = task.await;
1062 this.update(cx, |this, cx| {
1063 let project = this.project.clone();
1064 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1065 return;
1066 };
1067 if let Some(Some(location)) = resolved_locations.last() {
1068 project.update(cx, |project, cx| {
1069 if let Some(agent_location) = project.agent_location() {
1070 let should_ignore = agent_location.buffer == location.buffer
1071 && location
1072 .buffer
1073 .update(cx, |buffer, _| {
1074 let snapshot = buffer.snapshot();
1075 let old_position =
1076 agent_location.position.to_point(&snapshot);
1077 let new_position = location.position.to_point(&snapshot);
1078 // ignore this so that when we get updates from the edit tool
1079 // the position doesn't reset to the startof line
1080 old_position.row == new_position.row
1081 && old_position.column > new_position.column
1082 })
1083 .ok()
1084 .unwrap_or_default();
1085 if !should_ignore {
1086 project.set_agent_location(Some(location.clone()), cx);
1087 }
1088 }
1089 });
1090 }
1091 if tool_call.resolved_locations != resolved_locations {
1092 tool_call.resolved_locations = resolved_locations;
1093 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1094 }
1095 })
1096 })
1097 .detach();
1098 }
1099
1100 pub fn request_tool_call_authorization(
1101 &mut self,
1102 tool_call: acp::ToolCallUpdate,
1103 options: Vec<acp::PermissionOption>,
1104 cx: &mut Context<Self>,
1105 ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1106 let (tx, rx) = oneshot::channel();
1107
1108 let status = ToolCallStatus::WaitingForConfirmation {
1109 options,
1110 respond_tx: tx,
1111 };
1112
1113 self.upsert_tool_call_inner(tool_call, status, cx)?;
1114 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1115 Ok(rx)
1116 }
1117
1118 pub fn authorize_tool_call(
1119 &mut self,
1120 id: acp::ToolCallId,
1121 option_id: acp::PermissionOptionId,
1122 option_kind: acp::PermissionOptionKind,
1123 cx: &mut Context<Self>,
1124 ) {
1125 let Some((ix, call)) = self.tool_call_mut(&id) else {
1126 return;
1127 };
1128
1129 let new_status = match option_kind {
1130 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1131 ToolCallStatus::Rejected
1132 }
1133 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1134 ToolCallStatus::InProgress
1135 }
1136 };
1137
1138 let curr_status = mem::replace(&mut call.status, new_status);
1139
1140 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1141 respond_tx.send(option_id).log_err();
1142 } else if cfg!(debug_assertions) {
1143 panic!("tried to authorize an already authorized tool call");
1144 }
1145
1146 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1147 }
1148
1149 /// Returns true if the last turn is awaiting tool authorization
1150 pub fn waiting_for_tool_confirmation(&self) -> bool {
1151 for entry in self.entries.iter().rev() {
1152 match &entry {
1153 AgentThreadEntry::ToolCall(call) => match call.status {
1154 ToolCallStatus::WaitingForConfirmation { .. } => return true,
1155 ToolCallStatus::Pending
1156 | ToolCallStatus::InProgress
1157 | ToolCallStatus::Completed
1158 | ToolCallStatus::Failed
1159 | ToolCallStatus::Rejected
1160 | ToolCallStatus::Canceled => continue,
1161 },
1162 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1163 // Reached the beginning of the turn
1164 return false;
1165 }
1166 }
1167 }
1168 false
1169 }
1170
1171 pub fn plan(&self) -> &Plan {
1172 &self.plan
1173 }
1174
1175 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1176 let new_entries_len = request.entries.len();
1177 let mut new_entries = request.entries.into_iter();
1178
1179 // Reuse existing markdown to prevent flickering
1180 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1181 let PlanEntry {
1182 content,
1183 priority,
1184 status,
1185 } = old;
1186 content.update(cx, |old, cx| {
1187 old.replace(new.content, cx);
1188 });
1189 *priority = new.priority;
1190 *status = new.status;
1191 }
1192 for new in new_entries {
1193 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1194 }
1195 self.plan.entries.truncate(new_entries_len);
1196
1197 cx.notify();
1198 }
1199
1200 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1201 self.plan
1202 .entries
1203 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1204 cx.notify();
1205 }
1206
1207 #[cfg(any(test, feature = "test-support"))]
1208 pub fn send_raw(
1209 &mut self,
1210 message: &str,
1211 cx: &mut Context<Self>,
1212 ) -> BoxFuture<'static, Result<()>> {
1213 self.send(
1214 vec![acp::ContentBlock::Text(acp::TextContent {
1215 text: message.to_string(),
1216 annotations: None,
1217 })],
1218 cx,
1219 )
1220 }
1221
1222 pub fn send(
1223 &mut self,
1224 message: Vec<acp::ContentBlock>,
1225 cx: &mut Context<Self>,
1226 ) -> BoxFuture<'static, Result<()>> {
1227 let block = ContentBlock::new_combined(
1228 message.clone(),
1229 self.project.read(cx).languages().clone(),
1230 cx,
1231 );
1232 let request = acp::PromptRequest {
1233 prompt: message.clone(),
1234 session_id: self.session_id.clone(),
1235 };
1236 let git_store = self.project.read(cx).git_store().clone();
1237
1238 let message_id = if self
1239 .connection
1240 .session_editor(&self.session_id, cx)
1241 .is_some()
1242 {
1243 Some(UserMessageId::new())
1244 } else {
1245 None
1246 };
1247
1248 self.run_turn(cx, async move |this, cx| {
1249 this.update(cx, |this, cx| {
1250 this.push_entry(
1251 AgentThreadEntry::UserMessage(UserMessage {
1252 id: message_id.clone(),
1253 content: block,
1254 chunks: message,
1255 checkpoint: None,
1256 }),
1257 cx,
1258 );
1259 })
1260 .ok();
1261
1262 let old_checkpoint = git_store
1263 .update(cx, |git, cx| git.checkpoint(cx))?
1264 .await
1265 .context("failed to get old checkpoint")
1266 .log_err();
1267 this.update(cx, |this, cx| {
1268 if let Some((_ix, message)) = this.last_user_message() {
1269 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1270 git_checkpoint,
1271 show: false,
1272 });
1273 }
1274 this.connection.prompt(message_id, request, cx)
1275 })?
1276 .await
1277 })
1278 }
1279
1280 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1281 self.run_turn(cx, async move |this, cx| {
1282 this.update(cx, |this, cx| {
1283 this.connection
1284 .resume(&this.session_id, cx)
1285 .map(|resume| resume.run(cx))
1286 })?
1287 .context("resuming a session is not supported")?
1288 .await
1289 })
1290 }
1291
1292 fn run_turn(
1293 &mut self,
1294 cx: &mut Context<Self>,
1295 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1296 ) -> BoxFuture<'static, Result<()>> {
1297 self.clear_completed_plan_entries(cx);
1298
1299 let (tx, rx) = oneshot::channel();
1300 let cancel_task = self.cancel(cx);
1301
1302 self.send_task = Some(cx.spawn(async move |this, cx| {
1303 cancel_task.await;
1304 tx.send(f(this, cx).await).ok();
1305 }));
1306
1307 cx.spawn(async move |this, cx| {
1308 let response = rx.await;
1309
1310 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1311 .await?;
1312
1313 this.update(cx, |this, cx| {
1314 this.project
1315 .update(cx, |project, cx| project.set_agent_location(None, cx));
1316 match response {
1317 Ok(Err(e)) => {
1318 this.send_task.take();
1319 cx.emit(AcpThreadEvent::Error);
1320 Err(e)
1321 }
1322 result => {
1323 let canceled = matches!(
1324 result,
1325 Ok(Ok(acp::PromptResponse {
1326 stop_reason: acp::StopReason::Canceled
1327 }))
1328 );
1329
1330 // We only take the task if the current prompt wasn't canceled.
1331 //
1332 // This prompt may have been canceled because another one was sent
1333 // while it was still generating. In these cases, dropping `send_task`
1334 // would cause the next generation to be canceled.
1335 if !canceled {
1336 this.send_task.take();
1337 }
1338
1339 cx.emit(AcpThreadEvent::Stopped);
1340 Ok(())
1341 }
1342 }
1343 })?
1344 })
1345 .boxed()
1346 }
1347
1348 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1349 let Some(send_task) = self.send_task.take() else {
1350 return Task::ready(());
1351 };
1352
1353 for entry in self.entries.iter_mut() {
1354 if let AgentThreadEntry::ToolCall(call) = entry {
1355 let cancel = matches!(
1356 call.status,
1357 ToolCallStatus::Pending
1358 | ToolCallStatus::WaitingForConfirmation { .. }
1359 | ToolCallStatus::InProgress
1360 );
1361
1362 if cancel {
1363 call.status = ToolCallStatus::Canceled;
1364 }
1365 }
1366 }
1367
1368 self.connection.cancel(&self.session_id, cx);
1369
1370 // Wait for the send task to complete
1371 cx.foreground_executor().spawn(send_task)
1372 }
1373
1374 /// Rewinds this thread to before the entry at `index`, removing it and all
1375 /// subsequent entries while reverting any changes made from that point.
1376 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1377 let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1378 return Task::ready(Err(anyhow!("not supported")));
1379 };
1380 let Some(message) = self.user_message(&id) else {
1381 return Task::ready(Err(anyhow!("message not found")));
1382 };
1383
1384 let checkpoint = message
1385 .checkpoint
1386 .as_ref()
1387 .map(|c| c.git_checkpoint.clone());
1388
1389 let git_store = self.project.read(cx).git_store().clone();
1390 cx.spawn(async move |this, cx| {
1391 if let Some(checkpoint) = checkpoint {
1392 git_store
1393 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1394 .await?;
1395 }
1396
1397 cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1398 .await?;
1399 this.update(cx, |this, cx| {
1400 if let Some((ix, _)) = this.user_message_mut(&id) {
1401 let range = ix..this.entries.len();
1402 this.entries.truncate(ix);
1403 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1404 }
1405 })
1406 })
1407 }
1408
1409 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1410 let git_store = self.project.read(cx).git_store().clone();
1411
1412 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1413 if let Some(checkpoint) = message.checkpoint.as_ref() {
1414 checkpoint.git_checkpoint.clone()
1415 } else {
1416 return Task::ready(Ok(()));
1417 }
1418 } else {
1419 return Task::ready(Ok(()));
1420 };
1421
1422 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1423 cx.spawn(async move |this, cx| {
1424 let new_checkpoint = new_checkpoint
1425 .await
1426 .context("failed to get new checkpoint")
1427 .log_err();
1428 if let Some(new_checkpoint) = new_checkpoint {
1429 let equal = git_store
1430 .update(cx, |git, cx| {
1431 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1432 })?
1433 .await
1434 .unwrap_or(true);
1435 this.update(cx, |this, cx| {
1436 let (ix, message) = this.last_user_message().context("no user message")?;
1437 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1438 checkpoint.show = !equal;
1439 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1440 anyhow::Ok(())
1441 })??;
1442 }
1443
1444 Ok(())
1445 })
1446 }
1447
1448 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1449 self.entries
1450 .iter_mut()
1451 .enumerate()
1452 .rev()
1453 .find_map(|(ix, entry)| {
1454 if let AgentThreadEntry::UserMessage(message) = entry {
1455 Some((ix, message))
1456 } else {
1457 None
1458 }
1459 })
1460 }
1461
1462 fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1463 self.entries.iter().find_map(|entry| {
1464 if let AgentThreadEntry::UserMessage(message) = entry {
1465 if message.id.as_ref() == Some(id) {
1466 Some(message)
1467 } else {
1468 None
1469 }
1470 } else {
1471 None
1472 }
1473 })
1474 }
1475
1476 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1477 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1478 if let AgentThreadEntry::UserMessage(message) = entry {
1479 if message.id.as_ref() == Some(id) {
1480 Some((ix, message))
1481 } else {
1482 None
1483 }
1484 } else {
1485 None
1486 }
1487 })
1488 }
1489
1490 pub fn read_text_file(
1491 &self,
1492 path: PathBuf,
1493 line: Option<u32>,
1494 limit: Option<u32>,
1495 reuse_shared_snapshot: bool,
1496 cx: &mut Context<Self>,
1497 ) -> Task<Result<String>> {
1498 let project = self.project.clone();
1499 let action_log = self.action_log.clone();
1500 cx.spawn(async move |this, cx| {
1501 let load = project.update(cx, |project, cx| {
1502 let path = project
1503 .project_path_for_absolute_path(&path, cx)
1504 .context("invalid path")?;
1505 anyhow::Ok(project.open_buffer(path, cx))
1506 });
1507 let buffer = load??.await?;
1508
1509 let snapshot = if reuse_shared_snapshot {
1510 this.read_with(cx, |this, _| {
1511 this.shared_buffers.get(&buffer.clone()).cloned()
1512 })
1513 .log_err()
1514 .flatten()
1515 } else {
1516 None
1517 };
1518
1519 let snapshot = if let Some(snapshot) = snapshot {
1520 snapshot
1521 } else {
1522 action_log.update(cx, |action_log, cx| {
1523 action_log.buffer_read(buffer.clone(), cx);
1524 })?;
1525 project.update(cx, |project, cx| {
1526 let position = buffer
1527 .read(cx)
1528 .snapshot()
1529 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1530 project.set_agent_location(
1531 Some(AgentLocation {
1532 buffer: buffer.downgrade(),
1533 position,
1534 }),
1535 cx,
1536 );
1537 })?;
1538
1539 buffer.update(cx, |buffer, _| buffer.snapshot())?
1540 };
1541
1542 this.update(cx, |this, _| {
1543 let text = snapshot.text();
1544 this.shared_buffers.insert(buffer.clone(), snapshot);
1545 if line.is_none() && limit.is_none() {
1546 return Ok(text);
1547 }
1548 let limit = limit.unwrap_or(u32::MAX) as usize;
1549 let Some(line) = line else {
1550 return Ok(text.lines().take(limit).collect::<String>());
1551 };
1552
1553 let count = text.lines().count();
1554 if count < line as usize {
1555 anyhow::bail!("There are only {} lines", count);
1556 }
1557 Ok(text
1558 .lines()
1559 .skip(line as usize + 1)
1560 .take(limit)
1561 .collect::<String>())
1562 })?
1563 })
1564 }
1565
1566 pub fn write_text_file(
1567 &self,
1568 path: PathBuf,
1569 content: String,
1570 cx: &mut Context<Self>,
1571 ) -> Task<Result<()>> {
1572 let project = self.project.clone();
1573 let action_log = self.action_log.clone();
1574 cx.spawn(async move |this, cx| {
1575 let load = project.update(cx, |project, cx| {
1576 let path = project
1577 .project_path_for_absolute_path(&path, cx)
1578 .context("invalid path")?;
1579 anyhow::Ok(project.open_buffer(path, cx))
1580 });
1581 let buffer = load??.await?;
1582 let snapshot = this.update(cx, |this, cx| {
1583 this.shared_buffers
1584 .get(&buffer)
1585 .cloned()
1586 .unwrap_or_else(|| buffer.read(cx).snapshot())
1587 })?;
1588 let edits = cx
1589 .background_executor()
1590 .spawn(async move {
1591 let old_text = snapshot.text();
1592 text_diff(old_text.as_str(), &content)
1593 .into_iter()
1594 .map(|(range, replacement)| {
1595 (
1596 snapshot.anchor_after(range.start)
1597 ..snapshot.anchor_before(range.end),
1598 replacement,
1599 )
1600 })
1601 .collect::<Vec<_>>()
1602 })
1603 .await;
1604 cx.update(|cx| {
1605 project.update(cx, |project, cx| {
1606 project.set_agent_location(
1607 Some(AgentLocation {
1608 buffer: buffer.downgrade(),
1609 position: edits
1610 .last()
1611 .map(|(range, _)| range.end)
1612 .unwrap_or(Anchor::MIN),
1613 }),
1614 cx,
1615 );
1616 });
1617
1618 action_log.update(cx, |action_log, cx| {
1619 action_log.buffer_read(buffer.clone(), cx);
1620 });
1621 buffer.update(cx, |buffer, cx| {
1622 buffer.edit(edits, None, cx);
1623 });
1624 action_log.update(cx, |action_log, cx| {
1625 action_log.buffer_edited(buffer.clone(), cx);
1626 });
1627 })?;
1628 project
1629 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1630 .await
1631 })
1632 }
1633
1634 pub fn to_markdown(&self, cx: &App) -> String {
1635 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1636 }
1637
1638 pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1639 cx.emit(AcpThreadEvent::ServerExited(status));
1640 }
1641}
1642
1643fn markdown_for_raw_output(
1644 raw_output: &serde_json::Value,
1645 language_registry: &Arc<LanguageRegistry>,
1646 cx: &mut App,
1647) -> Option<Entity<Markdown>> {
1648 match raw_output {
1649 serde_json::Value::Null => None,
1650 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1651 Markdown::new(
1652 value.to_string().into(),
1653 Some(language_registry.clone()),
1654 None,
1655 cx,
1656 )
1657 })),
1658 serde_json::Value::Number(value) => Some(cx.new(|cx| {
1659 Markdown::new(
1660 value.to_string().into(),
1661 Some(language_registry.clone()),
1662 None,
1663 cx,
1664 )
1665 })),
1666 serde_json::Value::String(value) => Some(cx.new(|cx| {
1667 Markdown::new(
1668 value.clone().into(),
1669 Some(language_registry.clone()),
1670 None,
1671 cx,
1672 )
1673 })),
1674 value => Some(cx.new(|cx| {
1675 Markdown::new(
1676 format!("```json\n{}\n```", value).into(),
1677 Some(language_registry.clone()),
1678 None,
1679 cx,
1680 )
1681 })),
1682 }
1683}
1684
1685#[cfg(test)]
1686mod tests {
1687 use super::*;
1688 use anyhow::anyhow;
1689 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1690 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1691 use indoc::indoc;
1692 use project::{FakeFs, Fs};
1693 use rand::Rng as _;
1694 use serde_json::json;
1695 use settings::SettingsStore;
1696 use smol::stream::StreamExt as _;
1697 use std::{
1698 any::Any,
1699 cell::RefCell,
1700 path::Path,
1701 rc::Rc,
1702 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1703 time::Duration,
1704 };
1705 use util::path;
1706
1707 fn init_test(cx: &mut TestAppContext) {
1708 env_logger::try_init().ok();
1709 cx.update(|cx| {
1710 let settings_store = SettingsStore::test(cx);
1711 cx.set_global(settings_store);
1712 Project::init_settings(cx);
1713 language::init(cx);
1714 });
1715 }
1716
1717 #[gpui::test]
1718 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1719 init_test(cx);
1720
1721 let fs = FakeFs::new(cx.executor());
1722 let project = Project::test(fs, [], cx).await;
1723 let connection = Rc::new(FakeAgentConnection::new());
1724 let thread = cx
1725 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1726 .await
1727 .unwrap();
1728
1729 // Test creating a new user message
1730 thread.update(cx, |thread, cx| {
1731 thread.push_user_content_block(
1732 None,
1733 acp::ContentBlock::Text(acp::TextContent {
1734 annotations: None,
1735 text: "Hello, ".to_string(),
1736 }),
1737 cx,
1738 );
1739 });
1740
1741 thread.update(cx, |thread, cx| {
1742 assert_eq!(thread.entries.len(), 1);
1743 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1744 assert_eq!(user_msg.id, None);
1745 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1746 } else {
1747 panic!("Expected UserMessage");
1748 }
1749 });
1750
1751 // Test appending to existing user message
1752 let message_1_id = UserMessageId::new();
1753 thread.update(cx, |thread, cx| {
1754 thread.push_user_content_block(
1755 Some(message_1_id.clone()),
1756 acp::ContentBlock::Text(acp::TextContent {
1757 annotations: None,
1758 text: "world!".to_string(),
1759 }),
1760 cx,
1761 );
1762 });
1763
1764 thread.update(cx, |thread, cx| {
1765 assert_eq!(thread.entries.len(), 1);
1766 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1767 assert_eq!(user_msg.id, Some(message_1_id));
1768 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1769 } else {
1770 panic!("Expected UserMessage");
1771 }
1772 });
1773
1774 // Test creating new user message after assistant message
1775 thread.update(cx, |thread, cx| {
1776 thread.push_assistant_content_block(
1777 acp::ContentBlock::Text(acp::TextContent {
1778 annotations: None,
1779 text: "Assistant response".to_string(),
1780 }),
1781 false,
1782 cx,
1783 );
1784 });
1785
1786 let message_2_id = UserMessageId::new();
1787 thread.update(cx, |thread, cx| {
1788 thread.push_user_content_block(
1789 Some(message_2_id.clone()),
1790 acp::ContentBlock::Text(acp::TextContent {
1791 annotations: None,
1792 text: "New user message".to_string(),
1793 }),
1794 cx,
1795 );
1796 });
1797
1798 thread.update(cx, |thread, cx| {
1799 assert_eq!(thread.entries.len(), 3);
1800 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1801 assert_eq!(user_msg.id, Some(message_2_id));
1802 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1803 } else {
1804 panic!("Expected UserMessage at index 2");
1805 }
1806 });
1807 }
1808
1809 #[gpui::test]
1810 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1811 init_test(cx);
1812
1813 let fs = FakeFs::new(cx.executor());
1814 let project = Project::test(fs, [], cx).await;
1815 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1816 |_, thread, mut cx| {
1817 async move {
1818 thread.update(&mut cx, |thread, cx| {
1819 thread
1820 .handle_session_update(
1821 acp::SessionUpdate::AgentThoughtChunk {
1822 content: "Thinking ".into(),
1823 },
1824 cx,
1825 )
1826 .unwrap();
1827 thread
1828 .handle_session_update(
1829 acp::SessionUpdate::AgentThoughtChunk {
1830 content: "hard!".into(),
1831 },
1832 cx,
1833 )
1834 .unwrap();
1835 })?;
1836 Ok(acp::PromptResponse {
1837 stop_reason: acp::StopReason::EndTurn,
1838 })
1839 }
1840 .boxed_local()
1841 },
1842 ));
1843
1844 let thread = cx
1845 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1846 .await
1847 .unwrap();
1848
1849 thread
1850 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1851 .await
1852 .unwrap();
1853
1854 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1855 assert_eq!(
1856 output,
1857 indoc! {r#"
1858 ## User
1859
1860 Hello from Zed!
1861
1862 ## Assistant
1863
1864 <thinking>
1865 Thinking hard!
1866 </thinking>
1867
1868 "#}
1869 );
1870 }
1871
1872 #[gpui::test]
1873 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1874 init_test(cx);
1875
1876 let fs = FakeFs::new(cx.executor());
1877 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1878 .await;
1879 let project = Project::test(fs.clone(), [], cx).await;
1880 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1881 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1882 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1883 move |_, thread, mut cx| {
1884 let read_file_tx = read_file_tx.clone();
1885 async move {
1886 let content = thread
1887 .update(&mut cx, |thread, cx| {
1888 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1889 })
1890 .unwrap()
1891 .await
1892 .unwrap();
1893 assert_eq!(content, "one\ntwo\nthree\n");
1894 read_file_tx.take().unwrap().send(()).unwrap();
1895 thread
1896 .update(&mut cx, |thread, cx| {
1897 thread.write_text_file(
1898 path!("/tmp/foo").into(),
1899 "one\ntwo\nthree\nfour\nfive\n".to_string(),
1900 cx,
1901 )
1902 })
1903 .unwrap()
1904 .await
1905 .unwrap();
1906 Ok(acp::PromptResponse {
1907 stop_reason: acp::StopReason::EndTurn,
1908 })
1909 }
1910 .boxed_local()
1911 },
1912 ));
1913
1914 let (worktree, pathbuf) = project
1915 .update(cx, |project, cx| {
1916 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1917 })
1918 .await
1919 .unwrap();
1920 let buffer = project
1921 .update(cx, |project, cx| {
1922 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1923 })
1924 .await
1925 .unwrap();
1926
1927 let thread = cx
1928 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1929 .await
1930 .unwrap();
1931
1932 let request = thread.update(cx, |thread, cx| {
1933 thread.send_raw("Extend the count in /tmp/foo", cx)
1934 });
1935 read_file_rx.await.ok();
1936 buffer.update(cx, |buffer, cx| {
1937 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1938 });
1939 cx.run_until_parked();
1940 assert_eq!(
1941 buffer.read_with(cx, |buffer, _| buffer.text()),
1942 "zero\none\ntwo\nthree\nfour\nfive\n"
1943 );
1944 assert_eq!(
1945 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1946 "zero\none\ntwo\nthree\nfour\nfive\n"
1947 );
1948 request.await.unwrap();
1949 }
1950
1951 #[gpui::test]
1952 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1953 init_test(cx);
1954
1955 let fs = FakeFs::new(cx.executor());
1956 let project = Project::test(fs, [], cx).await;
1957 let id = acp::ToolCallId("test".into());
1958
1959 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1960 let id = id.clone();
1961 move |_, thread, mut cx| {
1962 let id = id.clone();
1963 async move {
1964 thread
1965 .update(&mut cx, |thread, cx| {
1966 thread.handle_session_update(
1967 acp::SessionUpdate::ToolCall(acp::ToolCall {
1968 id: id.clone(),
1969 title: "Label".into(),
1970 kind: acp::ToolKind::Fetch,
1971 status: acp::ToolCallStatus::InProgress,
1972 content: vec![],
1973 locations: vec![],
1974 raw_input: None,
1975 raw_output: None,
1976 }),
1977 cx,
1978 )
1979 })
1980 .unwrap()
1981 .unwrap();
1982 Ok(acp::PromptResponse {
1983 stop_reason: acp::StopReason::EndTurn,
1984 })
1985 }
1986 .boxed_local()
1987 }
1988 }));
1989
1990 let thread = cx
1991 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1992 .await
1993 .unwrap();
1994
1995 let request = thread.update(cx, |thread, cx| {
1996 thread.send_raw("Fetch https://example.com", cx)
1997 });
1998
1999 run_until_first_tool_call(&thread, cx).await;
2000
2001 thread.read_with(cx, |thread, _| {
2002 assert!(matches!(
2003 thread.entries[1],
2004 AgentThreadEntry::ToolCall(ToolCall {
2005 status: ToolCallStatus::InProgress,
2006 ..
2007 })
2008 ));
2009 });
2010
2011 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2012
2013 thread.read_with(cx, |thread, _| {
2014 assert!(matches!(
2015 &thread.entries[1],
2016 AgentThreadEntry::ToolCall(ToolCall {
2017 status: ToolCallStatus::Canceled,
2018 ..
2019 })
2020 ));
2021 });
2022
2023 thread
2024 .update(cx, |thread, cx| {
2025 thread.handle_session_update(
2026 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2027 id,
2028 fields: acp::ToolCallUpdateFields {
2029 status: Some(acp::ToolCallStatus::Completed),
2030 ..Default::default()
2031 },
2032 }),
2033 cx,
2034 )
2035 })
2036 .unwrap();
2037
2038 request.await.unwrap();
2039
2040 thread.read_with(cx, |thread, _| {
2041 assert!(matches!(
2042 thread.entries[1],
2043 AgentThreadEntry::ToolCall(ToolCall {
2044 status: ToolCallStatus::Completed,
2045 ..
2046 })
2047 ));
2048 });
2049 }
2050
2051 #[gpui::test]
2052 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2053 init_test(cx);
2054 let fs = FakeFs::new(cx.background_executor.clone());
2055 fs.insert_tree(path!("/test"), json!({})).await;
2056 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2057
2058 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2059 move |_, thread, mut cx| {
2060 async move {
2061 thread
2062 .update(&mut cx, |thread, cx| {
2063 thread.handle_session_update(
2064 acp::SessionUpdate::ToolCall(acp::ToolCall {
2065 id: acp::ToolCallId("test".into()),
2066 title: "Label".into(),
2067 kind: acp::ToolKind::Edit,
2068 status: acp::ToolCallStatus::Completed,
2069 content: vec![acp::ToolCallContent::Diff {
2070 diff: acp::Diff {
2071 path: "/test/test.txt".into(),
2072 old_text: None,
2073 new_text: "foo".into(),
2074 },
2075 }],
2076 locations: vec![],
2077 raw_input: None,
2078 raw_output: None,
2079 }),
2080 cx,
2081 )
2082 })
2083 .unwrap()
2084 .unwrap();
2085 Ok(acp::PromptResponse {
2086 stop_reason: acp::StopReason::EndTurn,
2087 })
2088 }
2089 .boxed_local()
2090 }
2091 }));
2092
2093 let thread = cx
2094 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2095 .await
2096 .unwrap();
2097
2098 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2099 .await
2100 .unwrap();
2101
2102 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2103 }
2104
2105 #[gpui::test(iterations = 10)]
2106 async fn test_checkpoints(cx: &mut TestAppContext) {
2107 init_test(cx);
2108 let fs = FakeFs::new(cx.background_executor.clone());
2109 fs.insert_tree(
2110 path!("/test"),
2111 json!({
2112 ".git": {}
2113 }),
2114 )
2115 .await;
2116 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2117
2118 let simulate_changes = Arc::new(AtomicBool::new(true));
2119 let next_filename = Arc::new(AtomicUsize::new(0));
2120 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2121 let simulate_changes = simulate_changes.clone();
2122 let next_filename = next_filename.clone();
2123 let fs = fs.clone();
2124 move |request, thread, mut cx| {
2125 let fs = fs.clone();
2126 let simulate_changes = simulate_changes.clone();
2127 let next_filename = next_filename.clone();
2128 async move {
2129 if simulate_changes.load(SeqCst) {
2130 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2131 fs.write(Path::new(&filename), b"").await?;
2132 }
2133
2134 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2135 panic!("expected text content block");
2136 };
2137 thread.update(&mut cx, |thread, cx| {
2138 thread
2139 .handle_session_update(
2140 acp::SessionUpdate::AgentMessageChunk {
2141 content: content.text.to_uppercase().into(),
2142 },
2143 cx,
2144 )
2145 .unwrap();
2146 })?;
2147 Ok(acp::PromptResponse {
2148 stop_reason: acp::StopReason::EndTurn,
2149 })
2150 }
2151 .boxed_local()
2152 }
2153 }));
2154 let thread = cx
2155 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2156 .await
2157 .unwrap();
2158
2159 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2160 .await
2161 .unwrap();
2162 thread.read_with(cx, |thread, cx| {
2163 assert_eq!(
2164 thread.to_markdown(cx),
2165 indoc! {"
2166 ## User (checkpoint)
2167
2168 Lorem
2169
2170 ## Assistant
2171
2172 LOREM
2173
2174 "}
2175 );
2176 });
2177 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2178
2179 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2180 .await
2181 .unwrap();
2182 thread.read_with(cx, |thread, cx| {
2183 assert_eq!(
2184 thread.to_markdown(cx),
2185 indoc! {"
2186 ## User (checkpoint)
2187
2188 Lorem
2189
2190 ## Assistant
2191
2192 LOREM
2193
2194 ## User (checkpoint)
2195
2196 ipsum
2197
2198 ## Assistant
2199
2200 IPSUM
2201
2202 "}
2203 );
2204 });
2205 assert_eq!(
2206 fs.files(),
2207 vec![
2208 Path::new(path!("/test/file-0")),
2209 Path::new(path!("/test/file-1"))
2210 ]
2211 );
2212
2213 // Checkpoint isn't stored when there are no changes.
2214 simulate_changes.store(false, SeqCst);
2215 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2216 .await
2217 .unwrap();
2218 thread.read_with(cx, |thread, cx| {
2219 assert_eq!(
2220 thread.to_markdown(cx),
2221 indoc! {"
2222 ## User (checkpoint)
2223
2224 Lorem
2225
2226 ## Assistant
2227
2228 LOREM
2229
2230 ## User (checkpoint)
2231
2232 ipsum
2233
2234 ## Assistant
2235
2236 IPSUM
2237
2238 ## User
2239
2240 dolor
2241
2242 ## Assistant
2243
2244 DOLOR
2245
2246 "}
2247 );
2248 });
2249 assert_eq!(
2250 fs.files(),
2251 vec![
2252 Path::new(path!("/test/file-0")),
2253 Path::new(path!("/test/file-1"))
2254 ]
2255 );
2256
2257 // Rewinding the conversation truncates the history and restores the checkpoint.
2258 thread
2259 .update(cx, |thread, cx| {
2260 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2261 panic!("unexpected entries {:?}", thread.entries)
2262 };
2263 thread.rewind(message.id.clone().unwrap(), cx)
2264 })
2265 .await
2266 .unwrap();
2267 thread.read_with(cx, |thread, cx| {
2268 assert_eq!(
2269 thread.to_markdown(cx),
2270 indoc! {"
2271 ## User (checkpoint)
2272
2273 Lorem
2274
2275 ## Assistant
2276
2277 LOREM
2278
2279 "}
2280 );
2281 });
2282 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2283 }
2284
2285 async fn run_until_first_tool_call(
2286 thread: &Entity<AcpThread>,
2287 cx: &mut TestAppContext,
2288 ) -> usize {
2289 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2290
2291 let subscription = cx.update(|cx| {
2292 cx.subscribe(thread, move |thread, _, cx| {
2293 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2294 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2295 return tx.try_send(ix).unwrap();
2296 }
2297 }
2298 })
2299 });
2300
2301 select! {
2302 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2303 panic!("Timeout waiting for tool call")
2304 }
2305 ix = rx.next().fuse() => {
2306 drop(subscription);
2307 ix.unwrap()
2308 }
2309 }
2310 }
2311
2312 #[derive(Clone, Default)]
2313 struct FakeAgentConnection {
2314 auth_methods: Vec<acp::AuthMethod>,
2315 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2316 on_user_message: Option<
2317 Rc<
2318 dyn Fn(
2319 acp::PromptRequest,
2320 WeakEntity<AcpThread>,
2321 AsyncApp,
2322 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2323 + 'static,
2324 >,
2325 >,
2326 }
2327
2328 impl FakeAgentConnection {
2329 fn new() -> Self {
2330 Self {
2331 auth_methods: Vec::new(),
2332 on_user_message: None,
2333 sessions: Arc::default(),
2334 }
2335 }
2336
2337 #[expect(unused)]
2338 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2339 self.auth_methods = auth_methods;
2340 self
2341 }
2342
2343 fn on_user_message(
2344 mut self,
2345 handler: impl Fn(
2346 acp::PromptRequest,
2347 WeakEntity<AcpThread>,
2348 AsyncApp,
2349 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2350 + 'static,
2351 ) -> Self {
2352 self.on_user_message.replace(Rc::new(handler));
2353 self
2354 }
2355 }
2356
2357 impl AgentConnection for FakeAgentConnection {
2358 fn auth_methods(&self) -> &[acp::AuthMethod] {
2359 &self.auth_methods
2360 }
2361
2362 fn new_thread(
2363 self: Rc<Self>,
2364 project: Entity<Project>,
2365 _cwd: &Path,
2366 cx: &mut App,
2367 ) -> Task<gpui::Result<Entity<AcpThread>>> {
2368 let session_id = acp::SessionId(
2369 rand::thread_rng()
2370 .sample_iter(&rand::distributions::Alphanumeric)
2371 .take(7)
2372 .map(char::from)
2373 .collect::<String>()
2374 .into(),
2375 );
2376 let action_log = cx.new(|_| ActionLog::new(project.clone()));
2377 let thread = cx.new(|_cx| {
2378 AcpThread::new(
2379 "Test",
2380 self.clone(),
2381 project,
2382 action_log,
2383 session_id.clone(),
2384 )
2385 });
2386 self.sessions.lock().insert(session_id, thread.downgrade());
2387 Task::ready(Ok(thread))
2388 }
2389
2390 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2391 if self.auth_methods().iter().any(|m| m.id == method) {
2392 Task::ready(Ok(()))
2393 } else {
2394 Task::ready(Err(anyhow!("Invalid Auth Method")))
2395 }
2396 }
2397
2398 fn prompt(
2399 &self,
2400 _id: Option<UserMessageId>,
2401 params: acp::PromptRequest,
2402 cx: &mut App,
2403 ) -> Task<gpui::Result<acp::PromptResponse>> {
2404 let sessions = self.sessions.lock();
2405 let thread = sessions.get(¶ms.session_id).unwrap();
2406 if let Some(handler) = &self.on_user_message {
2407 let handler = handler.clone();
2408 let thread = thread.clone();
2409 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2410 } else {
2411 Task::ready(Ok(acp::PromptResponse {
2412 stop_reason: acp::StopReason::EndTurn,
2413 }))
2414 }
2415 }
2416
2417 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2418 let sessions = self.sessions.lock();
2419 let thread = sessions.get(session_id).unwrap().clone();
2420
2421 cx.spawn(async move |cx| {
2422 thread
2423 .update(cx, |thread, cx| thread.cancel(cx))
2424 .unwrap()
2425 .await
2426 })
2427 .detach();
2428 }
2429
2430 fn session_editor(
2431 &self,
2432 session_id: &acp::SessionId,
2433 _cx: &mut App,
2434 ) -> Option<Rc<dyn AgentSessionEditor>> {
2435 Some(Rc::new(FakeAgentSessionEditor {
2436 _session_id: session_id.clone(),
2437 }))
2438 }
2439
2440 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2441 self
2442 }
2443 }
2444
2445 struct FakeAgentSessionEditor {
2446 _session_id: acp::SessionId,
2447 }
2448
2449 impl AgentSessionEditor for FakeAgentSessionEditor {
2450 fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2451 Task::ready(Ok(()))
2452 }
2453 }
2454}