1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use collections::HashSet;
7pub use connection::*;
8pub use diff::*;
9use language::language_settings::FormatOnSave;
10pub use mention::*;
11use project::lsp_store::{FormatTrigger, LspFormatTarget};
12use serde::{Deserialize, Serialize};
13pub use terminal::*;
14
15use action_log::ActionLog;
16use agent_client_protocol as acp;
17use anyhow::{Context as _, Result, anyhow};
18use editor::Bias;
19use futures::{FutureExt, channel::oneshot, future::BoxFuture};
20use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
21use itertools::Itertools;
22use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
23use markdown::Markdown;
24use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
25use std::collections::HashMap;
26use std::error::Error;
27use std::fmt::{Formatter, Write};
28use std::ops::Range;
29use std::process::ExitStatus;
30use std::rc::Rc;
31use std::time::{Duration, Instant};
32use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
33use ui::App;
34use util::ResultExt;
35
36#[derive(Debug)]
37pub struct UserMessage {
38 pub id: Option<UserMessageId>,
39 pub content: ContentBlock,
40 pub chunks: Vec<acp::ContentBlock>,
41 pub checkpoint: Option<Checkpoint>,
42}
43
44#[derive(Debug)]
45pub struct Checkpoint {
46 git_checkpoint: GitStoreCheckpoint,
47 pub show: bool,
48}
49
50impl UserMessage {
51 fn to_markdown(&self, cx: &App) -> String {
52 let mut markdown = String::new();
53 if self
54 .checkpoint
55 .as_ref()
56 .is_some_and(|checkpoint| checkpoint.show)
57 {
58 writeln!(markdown, "## User (checkpoint)").unwrap();
59 } else {
60 writeln!(markdown, "## User").unwrap();
61 }
62 writeln!(markdown).unwrap();
63 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
64 writeln!(markdown).unwrap();
65 markdown
66 }
67}
68
69#[derive(Debug, PartialEq)]
70pub struct AssistantMessage {
71 pub chunks: Vec<AssistantMessageChunk>,
72}
73
74impl AssistantMessage {
75 pub fn to_markdown(&self, cx: &App) -> String {
76 format!(
77 "## Assistant\n\n{}\n\n",
78 self.chunks
79 .iter()
80 .map(|chunk| chunk.to_markdown(cx))
81 .join("\n\n")
82 )
83 }
84}
85
86#[derive(Debug, PartialEq)]
87pub enum AssistantMessageChunk {
88 Message { block: ContentBlock },
89 Thought { block: ContentBlock },
90}
91
92impl AssistantMessageChunk {
93 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
94 Self::Message {
95 block: ContentBlock::new(chunk.into(), language_registry, cx),
96 }
97 }
98
99 fn to_markdown(&self, cx: &App) -> String {
100 match self {
101 Self::Message { block } => block.to_markdown(cx).to_string(),
102 Self::Thought { block } => {
103 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
104 }
105 }
106 }
107}
108
109#[derive(Debug)]
110pub enum AgentThreadEntry {
111 UserMessage(UserMessage),
112 AssistantMessage(AssistantMessage),
113 ToolCall(ToolCall),
114}
115
116impl AgentThreadEntry {
117 pub fn to_markdown(&self, cx: &App) -> String {
118 match self {
119 Self::UserMessage(message) => message.to_markdown(cx),
120 Self::AssistantMessage(message) => message.to_markdown(cx),
121 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
122 }
123 }
124
125 pub fn user_message(&self) -> Option<&UserMessage> {
126 if let AgentThreadEntry::UserMessage(message) = self {
127 Some(message)
128 } else {
129 None
130 }
131 }
132
133 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
134 if let AgentThreadEntry::ToolCall(call) = self {
135 itertools::Either::Left(call.diffs())
136 } else {
137 itertools::Either::Right(std::iter::empty())
138 }
139 }
140
141 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
142 if let AgentThreadEntry::ToolCall(call) = self {
143 itertools::Either::Left(call.terminals())
144 } else {
145 itertools::Either::Right(std::iter::empty())
146 }
147 }
148
149 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
150 if let AgentThreadEntry::ToolCall(ToolCall {
151 locations,
152 resolved_locations,
153 ..
154 }) = self
155 {
156 Some((
157 locations.get(ix)?.clone(),
158 resolved_locations.get(ix)?.clone()?,
159 ))
160 } else {
161 None
162 }
163 }
164}
165
166#[derive(Debug)]
167pub struct ToolCall {
168 pub id: acp::ToolCallId,
169 pub label: Entity<Markdown>,
170 pub kind: acp::ToolKind,
171 pub content: Vec<ToolCallContent>,
172 pub status: ToolCallStatus,
173 pub locations: Vec<acp::ToolCallLocation>,
174 pub resolved_locations: Vec<Option<AgentLocation>>,
175 pub raw_input: Option<serde_json::Value>,
176 pub raw_output: Option<serde_json::Value>,
177}
178
179impl ToolCall {
180 fn from_acp(
181 tool_call: acp::ToolCall,
182 status: ToolCallStatus,
183 language_registry: Arc<LanguageRegistry>,
184 cx: &mut App,
185 ) -> Self {
186 Self {
187 id: tool_call.id,
188 label: cx.new(|cx| {
189 Markdown::new(
190 tool_call.title.into(),
191 Some(language_registry.clone()),
192 None,
193 cx,
194 )
195 }),
196 kind: tool_call.kind,
197 content: tool_call
198 .content
199 .into_iter()
200 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
201 .collect(),
202 locations: tool_call.locations,
203 resolved_locations: Vec::default(),
204 status,
205 raw_input: tool_call.raw_input,
206 raw_output: tool_call.raw_output,
207 }
208 }
209
210 fn update_fields(
211 &mut self,
212 fields: acp::ToolCallUpdateFields,
213 language_registry: Arc<LanguageRegistry>,
214 cx: &mut App,
215 ) {
216 let acp::ToolCallUpdateFields {
217 kind,
218 status,
219 title,
220 content,
221 locations,
222 raw_input,
223 raw_output,
224 } = fields;
225
226 if let Some(kind) = kind {
227 self.kind = kind;
228 }
229
230 if let Some(status) = status {
231 self.status = status.into();
232 }
233
234 if let Some(title) = title {
235 self.label.update(cx, |label, cx| {
236 label.replace(title, cx);
237 });
238 }
239
240 if let Some(content) = content {
241 self.content = content
242 .into_iter()
243 .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
244 .collect();
245 }
246
247 if let Some(locations) = locations {
248 self.locations = locations;
249 }
250
251 if let Some(raw_input) = raw_input {
252 self.raw_input = Some(raw_input);
253 }
254
255 if let Some(raw_output) = raw_output {
256 if self.content.is_empty()
257 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
258 {
259 self.content
260 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
261 markdown,
262 }));
263 }
264 self.raw_output = Some(raw_output);
265 }
266 }
267
268 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
269 self.content.iter().filter_map(|content| match content {
270 ToolCallContent::Diff(diff) => Some(diff),
271 ToolCallContent::ContentBlock(_) => None,
272 ToolCallContent::Terminal(_) => None,
273 })
274 }
275
276 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
277 self.content.iter().filter_map(|content| match content {
278 ToolCallContent::Terminal(terminal) => Some(terminal),
279 ToolCallContent::ContentBlock(_) => None,
280 ToolCallContent::Diff(_) => None,
281 })
282 }
283
284 fn to_markdown(&self, cx: &App) -> String {
285 let mut markdown = format!(
286 "**Tool Call: {}**\nStatus: {}\n\n",
287 self.label.read(cx).source(),
288 self.status
289 );
290 for content in &self.content {
291 markdown.push_str(content.to_markdown(cx).as_str());
292 markdown.push_str("\n\n");
293 }
294 markdown
295 }
296
297 async fn resolve_location(
298 location: acp::ToolCallLocation,
299 project: WeakEntity<Project>,
300 cx: &mut AsyncApp,
301 ) -> Option<AgentLocation> {
302 let buffer = project
303 .update(cx, |project, cx| {
304 if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
305 Some(project.open_buffer(path, cx))
306 } else {
307 None
308 }
309 })
310 .ok()??;
311 let buffer = buffer.await.log_err()?;
312 let position = buffer
313 .update(cx, |buffer, _| {
314 if let Some(row) = location.line {
315 let snapshot = buffer.snapshot();
316 let column = snapshot.indent_size_for_line(row).len;
317 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
318 snapshot.anchor_before(point)
319 } else {
320 Anchor::MIN
321 }
322 })
323 .ok()?;
324
325 Some(AgentLocation {
326 buffer: buffer.downgrade(),
327 position,
328 })
329 }
330
331 fn resolve_locations(
332 &self,
333 project: Entity<Project>,
334 cx: &mut App,
335 ) -> Task<Vec<Option<AgentLocation>>> {
336 let locations = self.locations.clone();
337 project.update(cx, |_, cx| {
338 cx.spawn(async move |project, cx| {
339 let mut new_locations = Vec::new();
340 for location in locations {
341 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
342 }
343 new_locations
344 })
345 })
346 }
347}
348
349#[derive(Debug)]
350pub enum ToolCallStatus {
351 /// The tool call hasn't started running yet, but we start showing it to
352 /// the user.
353 Pending,
354 /// The tool call is waiting for confirmation from the user.
355 WaitingForConfirmation {
356 options: Vec<acp::PermissionOption>,
357 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
358 },
359 /// The tool call is currently running.
360 InProgress,
361 /// The tool call completed successfully.
362 Completed,
363 /// The tool call failed.
364 Failed,
365 /// The user rejected the tool call.
366 Rejected,
367 /// The user canceled generation so the tool call was canceled.
368 Canceled,
369}
370
371impl From<acp::ToolCallStatus> for ToolCallStatus {
372 fn from(status: acp::ToolCallStatus) -> Self {
373 match status {
374 acp::ToolCallStatus::Pending => Self::Pending,
375 acp::ToolCallStatus::InProgress => Self::InProgress,
376 acp::ToolCallStatus::Completed => Self::Completed,
377 acp::ToolCallStatus::Failed => Self::Failed,
378 }
379 }
380}
381
382impl Display for ToolCallStatus {
383 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
384 write!(
385 f,
386 "{}",
387 match self {
388 ToolCallStatus::Pending => "Pending",
389 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
390 ToolCallStatus::InProgress => "In Progress",
391 ToolCallStatus::Completed => "Completed",
392 ToolCallStatus::Failed => "Failed",
393 ToolCallStatus::Rejected => "Rejected",
394 ToolCallStatus::Canceled => "Canceled",
395 }
396 )
397 }
398}
399
400#[derive(Debug, PartialEq, Clone)]
401pub enum ContentBlock {
402 Empty,
403 Markdown { markdown: Entity<Markdown> },
404 ResourceLink { resource_link: acp::ResourceLink },
405}
406
407impl ContentBlock {
408 pub fn new(
409 block: acp::ContentBlock,
410 language_registry: &Arc<LanguageRegistry>,
411 cx: &mut App,
412 ) -> Self {
413 let mut this = Self::Empty;
414 this.append(block, language_registry, cx);
415 this
416 }
417
418 pub fn new_combined(
419 blocks: impl IntoIterator<Item = acp::ContentBlock>,
420 language_registry: Arc<LanguageRegistry>,
421 cx: &mut App,
422 ) -> Self {
423 let mut this = Self::Empty;
424 for block in blocks {
425 this.append(block, &language_registry, cx);
426 }
427 this
428 }
429
430 pub fn append(
431 &mut self,
432 block: acp::ContentBlock,
433 language_registry: &Arc<LanguageRegistry>,
434 cx: &mut App,
435 ) {
436 if matches!(self, ContentBlock::Empty)
437 && let acp::ContentBlock::ResourceLink(resource_link) = block
438 {
439 *self = ContentBlock::ResourceLink { resource_link };
440 return;
441 }
442
443 let new_content = self.block_string_contents(block);
444
445 match self {
446 ContentBlock::Empty => {
447 *self = Self::create_markdown_block(new_content, language_registry, cx);
448 }
449 ContentBlock::Markdown { markdown } => {
450 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
451 }
452 ContentBlock::ResourceLink { resource_link } => {
453 let existing_content = Self::resource_link_md(&resource_link.uri);
454 let combined = format!("{}\n{}", existing_content, new_content);
455
456 *self = Self::create_markdown_block(combined, language_registry, cx);
457 }
458 }
459 }
460
461 fn create_markdown_block(
462 content: String,
463 language_registry: &Arc<LanguageRegistry>,
464 cx: &mut App,
465 ) -> ContentBlock {
466 ContentBlock::Markdown {
467 markdown: cx
468 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
469 }
470 }
471
472 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
473 match block {
474 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
475 acp::ContentBlock::ResourceLink(resource_link) => {
476 Self::resource_link_md(&resource_link.uri)
477 }
478 acp::ContentBlock::Resource(acp::EmbeddedResource {
479 resource:
480 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
481 uri,
482 ..
483 }),
484 ..
485 }) => Self::resource_link_md(&uri),
486 acp::ContentBlock::Image(image) => Self::image_md(&image),
487 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
488 }
489 }
490
491 fn resource_link_md(uri: &str) -> String {
492 if let Some(uri) = MentionUri::parse(uri).log_err() {
493 uri.as_link().to_string()
494 } else {
495 uri.to_string()
496 }
497 }
498
499 fn image_md(_image: &acp::ImageContent) -> String {
500 "`Image`".into()
501 }
502
503 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
504 match self {
505 ContentBlock::Empty => "",
506 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
507 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
508 }
509 }
510
511 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
512 match self {
513 ContentBlock::Empty => None,
514 ContentBlock::Markdown { markdown } => Some(markdown),
515 ContentBlock::ResourceLink { .. } => None,
516 }
517 }
518
519 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
520 match self {
521 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
522 _ => None,
523 }
524 }
525}
526
527#[derive(Debug)]
528pub enum ToolCallContent {
529 ContentBlock(ContentBlock),
530 Diff(Entity<Diff>),
531 Terminal(Entity<Terminal>),
532}
533
534impl ToolCallContent {
535 pub fn from_acp(
536 content: acp::ToolCallContent,
537 language_registry: Arc<LanguageRegistry>,
538 cx: &mut App,
539 ) -> Self {
540 match content {
541 acp::ToolCallContent::Content { content } => {
542 Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
543 }
544 acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
545 Diff::finalized(
546 diff.path,
547 diff.old_text,
548 diff.new_text,
549 language_registry,
550 cx,
551 )
552 })),
553 }
554 }
555
556 pub fn to_markdown(&self, cx: &App) -> String {
557 match self {
558 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
559 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
560 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
561 }
562 }
563}
564
565#[derive(Debug, PartialEq)]
566pub enum ToolCallUpdate {
567 UpdateFields(acp::ToolCallUpdate),
568 UpdateDiff(ToolCallUpdateDiff),
569 UpdateTerminal(ToolCallUpdateTerminal),
570}
571
572impl ToolCallUpdate {
573 fn id(&self) -> &acp::ToolCallId {
574 match self {
575 Self::UpdateFields(update) => &update.id,
576 Self::UpdateDiff(diff) => &diff.id,
577 Self::UpdateTerminal(terminal) => &terminal.id,
578 }
579 }
580}
581
582impl From<acp::ToolCallUpdate> for ToolCallUpdate {
583 fn from(update: acp::ToolCallUpdate) -> Self {
584 Self::UpdateFields(update)
585 }
586}
587
588impl From<ToolCallUpdateDiff> for ToolCallUpdate {
589 fn from(diff: ToolCallUpdateDiff) -> Self {
590 Self::UpdateDiff(diff)
591 }
592}
593
594#[derive(Debug, PartialEq)]
595pub struct ToolCallUpdateDiff {
596 pub id: acp::ToolCallId,
597 pub diff: Entity<Diff>,
598}
599
600impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
601 fn from(terminal: ToolCallUpdateTerminal) -> Self {
602 Self::UpdateTerminal(terminal)
603 }
604}
605
606#[derive(Debug, PartialEq)]
607pub struct ToolCallUpdateTerminal {
608 pub id: acp::ToolCallId,
609 pub terminal: Entity<Terminal>,
610}
611
612#[derive(Debug, Default)]
613pub struct Plan {
614 pub entries: Vec<PlanEntry>,
615}
616
617#[derive(Debug)]
618pub struct PlanStats<'a> {
619 pub in_progress_entry: Option<&'a PlanEntry>,
620 pub pending: u32,
621 pub completed: u32,
622}
623
624impl Plan {
625 pub fn is_empty(&self) -> bool {
626 self.entries.is_empty()
627 }
628
629 pub fn stats(&self) -> PlanStats<'_> {
630 let mut stats = PlanStats {
631 in_progress_entry: None,
632 pending: 0,
633 completed: 0,
634 };
635
636 for entry in &self.entries {
637 match &entry.status {
638 acp::PlanEntryStatus::Pending => {
639 stats.pending += 1;
640 }
641 acp::PlanEntryStatus::InProgress => {
642 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
643 }
644 acp::PlanEntryStatus::Completed => {
645 stats.completed += 1;
646 }
647 }
648 }
649
650 stats
651 }
652}
653
654#[derive(Debug)]
655pub struct PlanEntry {
656 pub content: Entity<Markdown>,
657 pub priority: acp::PlanEntryPriority,
658 pub status: acp::PlanEntryStatus,
659}
660
661impl PlanEntry {
662 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
663 Self {
664 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
665 priority: entry.priority,
666 status: entry.status,
667 }
668 }
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub struct TokenUsage {
673 pub max_tokens: u64,
674 pub used_tokens: u64,
675}
676
677#[derive(Debug, Clone)]
678pub struct RetryStatus {
679 pub last_error: SharedString,
680 pub attempt: usize,
681 pub max_attempts: usize,
682 pub started_at: Instant,
683 pub duration: Duration,
684}
685
686pub struct AcpThread {
687 title: SharedString,
688 entries: Vec<AgentThreadEntry>,
689 plan: Plan,
690 project: Entity<Project>,
691 action_log: Entity<ActionLog>,
692 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
693 send_task: Option<Task<()>>,
694 connection: Rc<dyn AgentConnection>,
695 session_id: acp::SessionId,
696 token_usage: Option<TokenUsage>,
697}
698
699#[derive(Debug)]
700pub enum AcpThreadEvent {
701 NewEntry,
702 TitleUpdated,
703 TokenUsageUpdated,
704 EntryUpdated(usize),
705 EntriesRemoved(Range<usize>),
706 ToolAuthorizationRequired,
707 Retry(RetryStatus),
708 Stopped,
709 Error,
710 LoadError(LoadError),
711}
712
713impl EventEmitter<AcpThreadEvent> for AcpThread {}
714
715#[derive(PartialEq, Eq)]
716pub enum ThreadStatus {
717 Idle,
718 WaitingForToolConfirmation,
719 Generating,
720}
721
722#[derive(Debug, Clone)]
723pub enum LoadError {
724 NotInstalled {
725 error_message: SharedString,
726 install_message: SharedString,
727 install_command: String,
728 },
729 Unsupported {
730 error_message: SharedString,
731 upgrade_message: SharedString,
732 upgrade_command: String,
733 },
734 Exited {
735 status: ExitStatus,
736 },
737 Other(SharedString),
738}
739
740impl Display for LoadError {
741 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
742 match self {
743 LoadError::NotInstalled { error_message, .. }
744 | LoadError::Unsupported { error_message, .. } => {
745 write!(f, "{error_message}")
746 }
747 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
748 LoadError::Other(msg) => write!(f, "{}", msg),
749 }
750 }
751}
752
753impl Error for LoadError {}
754
755impl AcpThread {
756 pub fn new(
757 title: impl Into<SharedString>,
758 connection: Rc<dyn AgentConnection>,
759 project: Entity<Project>,
760 action_log: Entity<ActionLog>,
761 session_id: acp::SessionId,
762 ) -> Self {
763 Self {
764 action_log,
765 shared_buffers: Default::default(),
766 entries: Default::default(),
767 plan: Default::default(),
768 title: title.into(),
769 project,
770 send_task: None,
771 connection,
772 session_id,
773 token_usage: None,
774 }
775 }
776
777 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
778 &self.connection
779 }
780
781 pub fn action_log(&self) -> &Entity<ActionLog> {
782 &self.action_log
783 }
784
785 pub fn project(&self) -> &Entity<Project> {
786 &self.project
787 }
788
789 pub fn title(&self) -> SharedString {
790 self.title.clone()
791 }
792
793 pub fn entries(&self) -> &[AgentThreadEntry] {
794 &self.entries
795 }
796
797 pub fn session_id(&self) -> &acp::SessionId {
798 &self.session_id
799 }
800
801 pub fn status(&self) -> ThreadStatus {
802 if self.send_task.is_some() {
803 if self.waiting_for_tool_confirmation() {
804 ThreadStatus::WaitingForToolConfirmation
805 } else {
806 ThreadStatus::Generating
807 }
808 } else {
809 ThreadStatus::Idle
810 }
811 }
812
813 pub fn token_usage(&self) -> Option<&TokenUsage> {
814 self.token_usage.as_ref()
815 }
816
817 pub fn has_pending_edit_tool_calls(&self) -> bool {
818 for entry in self.entries.iter().rev() {
819 match entry {
820 AgentThreadEntry::UserMessage(_) => return false,
821 AgentThreadEntry::ToolCall(
822 call @ ToolCall {
823 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
824 ..
825 },
826 ) if call.diffs().next().is_some() => {
827 return true;
828 }
829 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
830 }
831 }
832
833 false
834 }
835
836 pub fn used_tools_since_last_user_message(&self) -> bool {
837 for entry in self.entries.iter().rev() {
838 match entry {
839 AgentThreadEntry::UserMessage(..) => return false,
840 AgentThreadEntry::AssistantMessage(..) => continue,
841 AgentThreadEntry::ToolCall(..) => return true,
842 }
843 }
844
845 false
846 }
847
848 pub fn handle_session_update(
849 &mut self,
850 update: acp::SessionUpdate,
851 cx: &mut Context<Self>,
852 ) -> Result<(), acp::Error> {
853 match update {
854 acp::SessionUpdate::UserMessageChunk { content } => {
855 self.push_user_content_block(None, content, cx);
856 }
857 acp::SessionUpdate::AgentMessageChunk { content } => {
858 self.push_assistant_content_block(content, false, cx);
859 }
860 acp::SessionUpdate::AgentThoughtChunk { content } => {
861 self.push_assistant_content_block(content, true, cx);
862 }
863 acp::SessionUpdate::ToolCall(tool_call) => {
864 self.upsert_tool_call(tool_call, cx)?;
865 }
866 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
867 self.update_tool_call(tool_call_update, cx)?;
868 }
869 acp::SessionUpdate::Plan(plan) => {
870 self.update_plan(plan, cx);
871 }
872 }
873 Ok(())
874 }
875
876 pub fn push_user_content_block(
877 &mut self,
878 message_id: Option<UserMessageId>,
879 chunk: acp::ContentBlock,
880 cx: &mut Context<Self>,
881 ) {
882 let language_registry = self.project.read(cx).languages().clone();
883 let entries_len = self.entries.len();
884
885 if let Some(last_entry) = self.entries.last_mut()
886 && let AgentThreadEntry::UserMessage(UserMessage {
887 id,
888 content,
889 chunks,
890 ..
891 }) = last_entry
892 {
893 *id = message_id.or(id.take());
894 content.append(chunk.clone(), &language_registry, cx);
895 chunks.push(chunk);
896 let idx = entries_len - 1;
897 cx.emit(AcpThreadEvent::EntryUpdated(idx));
898 } else {
899 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
900 self.push_entry(
901 AgentThreadEntry::UserMessage(UserMessage {
902 id: message_id,
903 content,
904 chunks: vec![chunk],
905 checkpoint: None,
906 }),
907 cx,
908 );
909 }
910 }
911
912 pub fn push_assistant_content_block(
913 &mut self,
914 chunk: acp::ContentBlock,
915 is_thought: bool,
916 cx: &mut Context<Self>,
917 ) {
918 let language_registry = self.project.read(cx).languages().clone();
919 let entries_len = self.entries.len();
920 if let Some(last_entry) = self.entries.last_mut()
921 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
922 {
923 let idx = entries_len - 1;
924 cx.emit(AcpThreadEvent::EntryUpdated(idx));
925 match (chunks.last_mut(), is_thought) {
926 (Some(AssistantMessageChunk::Message { block }), false)
927 | (Some(AssistantMessageChunk::Thought { block }), true) => {
928 block.append(chunk, &language_registry, cx)
929 }
930 _ => {
931 let block = ContentBlock::new(chunk, &language_registry, cx);
932 if is_thought {
933 chunks.push(AssistantMessageChunk::Thought { block })
934 } else {
935 chunks.push(AssistantMessageChunk::Message { block })
936 }
937 }
938 }
939 } else {
940 let block = ContentBlock::new(chunk, &language_registry, cx);
941 let chunk = if is_thought {
942 AssistantMessageChunk::Thought { block }
943 } else {
944 AssistantMessageChunk::Message { block }
945 };
946
947 self.push_entry(
948 AgentThreadEntry::AssistantMessage(AssistantMessage {
949 chunks: vec![chunk],
950 }),
951 cx,
952 );
953 }
954 }
955
956 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
957 self.entries.push(entry);
958 cx.emit(AcpThreadEvent::NewEntry);
959 }
960
961 pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
962 self.title = title;
963 cx.emit(AcpThreadEvent::TitleUpdated);
964 Ok(())
965 }
966
967 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
968 self.token_usage = usage;
969 cx.emit(AcpThreadEvent::TokenUsageUpdated);
970 }
971
972 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
973 cx.emit(AcpThreadEvent::Retry(status));
974 }
975
976 pub fn update_tool_call(
977 &mut self,
978 update: impl Into<ToolCallUpdate>,
979 cx: &mut Context<Self>,
980 ) -> Result<()> {
981 let update = update.into();
982 let languages = self.project.read(cx).languages().clone();
983
984 let (ix, current_call) = self
985 .tool_call_mut(update.id())
986 .context("Tool call not found")?;
987 match update {
988 ToolCallUpdate::UpdateFields(update) => {
989 let location_updated = update.fields.locations.is_some();
990 current_call.update_fields(update.fields, languages, cx);
991 if location_updated {
992 self.resolve_locations(update.id.clone(), cx);
993 }
994 }
995 ToolCallUpdate::UpdateDiff(update) => {
996 current_call.content.clear();
997 current_call
998 .content
999 .push(ToolCallContent::Diff(update.diff));
1000 }
1001 ToolCallUpdate::UpdateTerminal(update) => {
1002 current_call.content.clear();
1003 current_call
1004 .content
1005 .push(ToolCallContent::Terminal(update.terminal));
1006 }
1007 }
1008
1009 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1010
1011 Ok(())
1012 }
1013
1014 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1015 pub fn upsert_tool_call(
1016 &mut self,
1017 tool_call: acp::ToolCall,
1018 cx: &mut Context<Self>,
1019 ) -> Result<(), acp::Error> {
1020 let status = tool_call.status.into();
1021 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1022 }
1023
1024 /// Fails if id does not match an existing entry.
1025 pub fn upsert_tool_call_inner(
1026 &mut self,
1027 tool_call_update: acp::ToolCallUpdate,
1028 status: ToolCallStatus,
1029 cx: &mut Context<Self>,
1030 ) -> Result<(), acp::Error> {
1031 let language_registry = self.project.read(cx).languages().clone();
1032 let id = tool_call_update.id.clone();
1033
1034 if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1035 current_call.update_fields(tool_call_update.fields, language_registry, cx);
1036 current_call.status = status;
1037
1038 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1039 } else {
1040 let call =
1041 ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1042 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1043 };
1044
1045 self.resolve_locations(id, cx);
1046 Ok(())
1047 }
1048
1049 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1050 // The tool call we are looking for is typically the last one, or very close to the end.
1051 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1052 self.entries
1053 .iter_mut()
1054 .enumerate()
1055 .rev()
1056 .find_map(|(index, tool_call)| {
1057 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1058 && &tool_call.id == id
1059 {
1060 Some((index, tool_call))
1061 } else {
1062 None
1063 }
1064 })
1065 }
1066
1067 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1068 self.entries
1069 .iter()
1070 .enumerate()
1071 .rev()
1072 .find_map(|(index, tool_call)| {
1073 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1074 && &tool_call.id == id
1075 {
1076 Some((index, tool_call))
1077 } else {
1078 None
1079 }
1080 })
1081 }
1082
1083 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1084 let project = self.project.clone();
1085 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1086 return;
1087 };
1088 let task = tool_call.resolve_locations(project, cx);
1089 cx.spawn(async move |this, cx| {
1090 let resolved_locations = task.await;
1091 this.update(cx, |this, cx| {
1092 let project = this.project.clone();
1093 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1094 return;
1095 };
1096 if let Some(Some(location)) = resolved_locations.last() {
1097 project.update(cx, |project, cx| {
1098 if let Some(agent_location) = project.agent_location() {
1099 let should_ignore = agent_location.buffer == location.buffer
1100 && location
1101 .buffer
1102 .update(cx, |buffer, _| {
1103 let snapshot = buffer.snapshot();
1104 let old_position =
1105 agent_location.position.to_point(&snapshot);
1106 let new_position = location.position.to_point(&snapshot);
1107 // ignore this so that when we get updates from the edit tool
1108 // the position doesn't reset to the startof line
1109 old_position.row == new_position.row
1110 && old_position.column > new_position.column
1111 })
1112 .ok()
1113 .unwrap_or_default();
1114 if !should_ignore {
1115 project.set_agent_location(Some(location.clone()), cx);
1116 }
1117 }
1118 });
1119 }
1120 if tool_call.resolved_locations != resolved_locations {
1121 tool_call.resolved_locations = resolved_locations;
1122 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1123 }
1124 })
1125 })
1126 .detach();
1127 }
1128
1129 pub fn request_tool_call_authorization(
1130 &mut self,
1131 tool_call: acp::ToolCallUpdate,
1132 options: Vec<acp::PermissionOption>,
1133 cx: &mut Context<Self>,
1134 ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1135 let (tx, rx) = oneshot::channel();
1136
1137 let status = ToolCallStatus::WaitingForConfirmation {
1138 options,
1139 respond_tx: tx,
1140 };
1141
1142 self.upsert_tool_call_inner(tool_call, status, cx)?;
1143 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1144 Ok(rx)
1145 }
1146
1147 pub fn authorize_tool_call(
1148 &mut self,
1149 id: acp::ToolCallId,
1150 option_id: acp::PermissionOptionId,
1151 option_kind: acp::PermissionOptionKind,
1152 cx: &mut Context<Self>,
1153 ) {
1154 let Some((ix, call)) = self.tool_call_mut(&id) else {
1155 return;
1156 };
1157
1158 let new_status = match option_kind {
1159 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1160 ToolCallStatus::Rejected
1161 }
1162 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1163 ToolCallStatus::InProgress
1164 }
1165 };
1166
1167 let curr_status = mem::replace(&mut call.status, new_status);
1168
1169 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1170 respond_tx.send(option_id).log_err();
1171 } else if cfg!(debug_assertions) {
1172 panic!("tried to authorize an already authorized tool call");
1173 }
1174
1175 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1176 }
1177
1178 /// Returns true if the last turn is awaiting tool authorization
1179 pub fn waiting_for_tool_confirmation(&self) -> bool {
1180 for entry in self.entries.iter().rev() {
1181 match &entry {
1182 AgentThreadEntry::ToolCall(call) => match call.status {
1183 ToolCallStatus::WaitingForConfirmation { .. } => return true,
1184 ToolCallStatus::Pending
1185 | ToolCallStatus::InProgress
1186 | ToolCallStatus::Completed
1187 | ToolCallStatus::Failed
1188 | ToolCallStatus::Rejected
1189 | ToolCallStatus::Canceled => continue,
1190 },
1191 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1192 // Reached the beginning of the turn
1193 return false;
1194 }
1195 }
1196 }
1197 false
1198 }
1199
1200 pub fn plan(&self) -> &Plan {
1201 &self.plan
1202 }
1203
1204 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1205 let new_entries_len = request.entries.len();
1206 let mut new_entries = request.entries.into_iter();
1207
1208 // Reuse existing markdown to prevent flickering
1209 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1210 let PlanEntry {
1211 content,
1212 priority,
1213 status,
1214 } = old;
1215 content.update(cx, |old, cx| {
1216 old.replace(new.content, cx);
1217 });
1218 *priority = new.priority;
1219 *status = new.status;
1220 }
1221 for new in new_entries {
1222 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1223 }
1224 self.plan.entries.truncate(new_entries_len);
1225
1226 cx.notify();
1227 }
1228
1229 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1230 self.plan
1231 .entries
1232 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1233 cx.notify();
1234 }
1235
1236 #[cfg(any(test, feature = "test-support"))]
1237 pub fn send_raw(
1238 &mut self,
1239 message: &str,
1240 cx: &mut Context<Self>,
1241 ) -> BoxFuture<'static, Result<()>> {
1242 self.send(
1243 vec![acp::ContentBlock::Text(acp::TextContent {
1244 text: message.to_string(),
1245 annotations: None,
1246 })],
1247 cx,
1248 )
1249 }
1250
1251 pub fn send(
1252 &mut self,
1253 message: Vec<acp::ContentBlock>,
1254 cx: &mut Context<Self>,
1255 ) -> BoxFuture<'static, Result<()>> {
1256 let block = ContentBlock::new_combined(
1257 message.clone(),
1258 self.project.read(cx).languages().clone(),
1259 cx,
1260 );
1261 let request = acp::PromptRequest {
1262 prompt: message.clone(),
1263 session_id: self.session_id.clone(),
1264 };
1265 let git_store = self.project.read(cx).git_store().clone();
1266
1267 let message_id = if self
1268 .connection
1269 .session_editor(&self.session_id, cx)
1270 .is_some()
1271 {
1272 Some(UserMessageId::new())
1273 } else {
1274 None
1275 };
1276
1277 self.run_turn(cx, async move |this, cx| {
1278 this.update(cx, |this, cx| {
1279 this.push_entry(
1280 AgentThreadEntry::UserMessage(UserMessage {
1281 id: message_id.clone(),
1282 content: block,
1283 chunks: message,
1284 checkpoint: None,
1285 }),
1286 cx,
1287 );
1288 })
1289 .ok();
1290
1291 let old_checkpoint = git_store
1292 .update(cx, |git, cx| git.checkpoint(cx))?
1293 .await
1294 .context("failed to get old checkpoint")
1295 .log_err();
1296 this.update(cx, |this, cx| {
1297 if let Some((_ix, message)) = this.last_user_message() {
1298 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1299 git_checkpoint,
1300 show: false,
1301 });
1302 }
1303 this.connection.prompt(message_id, request, cx)
1304 })?
1305 .await
1306 })
1307 }
1308
1309 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1310 self.run_turn(cx, async move |this, cx| {
1311 this.update(cx, |this, cx| {
1312 this.connection
1313 .resume(&this.session_id, cx)
1314 .map(|resume| resume.run(cx))
1315 })?
1316 .context("resuming a session is not supported")?
1317 .await
1318 })
1319 }
1320
1321 fn run_turn(
1322 &mut self,
1323 cx: &mut Context<Self>,
1324 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1325 ) -> BoxFuture<'static, Result<()>> {
1326 self.clear_completed_plan_entries(cx);
1327
1328 let (tx, rx) = oneshot::channel();
1329 let cancel_task = self.cancel(cx);
1330
1331 self.send_task = Some(cx.spawn(async move |this, cx| {
1332 cancel_task.await;
1333 tx.send(f(this, cx).await).ok();
1334 }));
1335
1336 cx.spawn(async move |this, cx| {
1337 let response = rx.await;
1338
1339 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1340 .await?;
1341
1342 this.update(cx, |this, cx| {
1343 this.project
1344 .update(cx, |project, cx| project.set_agent_location(None, cx));
1345 match response {
1346 Ok(Err(e)) => {
1347 this.send_task.take();
1348 cx.emit(AcpThreadEvent::Error);
1349 Err(e)
1350 }
1351 result => {
1352 let canceled = matches!(
1353 result,
1354 Ok(Ok(acp::PromptResponse {
1355 stop_reason: acp::StopReason::Canceled
1356 }))
1357 );
1358
1359 // We only take the task if the current prompt wasn't canceled.
1360 //
1361 // This prompt may have been canceled because another one was sent
1362 // while it was still generating. In these cases, dropping `send_task`
1363 // would cause the next generation to be canceled.
1364 if !canceled {
1365 this.send_task.take();
1366 }
1367
1368 cx.emit(AcpThreadEvent::Stopped);
1369 Ok(())
1370 }
1371 }
1372 })?
1373 })
1374 .boxed()
1375 }
1376
1377 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1378 let Some(send_task) = self.send_task.take() else {
1379 return Task::ready(());
1380 };
1381
1382 for entry in self.entries.iter_mut() {
1383 if let AgentThreadEntry::ToolCall(call) = entry {
1384 let cancel = matches!(
1385 call.status,
1386 ToolCallStatus::Pending
1387 | ToolCallStatus::WaitingForConfirmation { .. }
1388 | ToolCallStatus::InProgress
1389 );
1390
1391 if cancel {
1392 call.status = ToolCallStatus::Canceled;
1393 }
1394 }
1395 }
1396
1397 self.connection.cancel(&self.session_id, cx);
1398
1399 // Wait for the send task to complete
1400 cx.foreground_executor().spawn(send_task)
1401 }
1402
1403 /// Rewinds this thread to before the entry at `index`, removing it and all
1404 /// subsequent entries while reverting any changes made from that point.
1405 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1406 let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1407 return Task::ready(Err(anyhow!("not supported")));
1408 };
1409 let Some(message) = self.user_message(&id) else {
1410 return Task::ready(Err(anyhow!("message not found")));
1411 };
1412
1413 let checkpoint = message
1414 .checkpoint
1415 .as_ref()
1416 .map(|c| c.git_checkpoint.clone());
1417
1418 let git_store = self.project.read(cx).git_store().clone();
1419 cx.spawn(async move |this, cx| {
1420 if let Some(checkpoint) = checkpoint {
1421 git_store
1422 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1423 .await?;
1424 }
1425
1426 cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1427 .await?;
1428 this.update(cx, |this, cx| {
1429 if let Some((ix, _)) = this.user_message_mut(&id) {
1430 let range = ix..this.entries.len();
1431 this.entries.truncate(ix);
1432 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1433 }
1434 })
1435 })
1436 }
1437
1438 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1439 let git_store = self.project.read(cx).git_store().clone();
1440
1441 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1442 if let Some(checkpoint) = message.checkpoint.as_ref() {
1443 checkpoint.git_checkpoint.clone()
1444 } else {
1445 return Task::ready(Ok(()));
1446 }
1447 } else {
1448 return Task::ready(Ok(()));
1449 };
1450
1451 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1452 cx.spawn(async move |this, cx| {
1453 let new_checkpoint = new_checkpoint
1454 .await
1455 .context("failed to get new checkpoint")
1456 .log_err();
1457 if let Some(new_checkpoint) = new_checkpoint {
1458 let equal = git_store
1459 .update(cx, |git, cx| {
1460 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1461 })?
1462 .await
1463 .unwrap_or(true);
1464 this.update(cx, |this, cx| {
1465 let (ix, message) = this.last_user_message().context("no user message")?;
1466 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1467 checkpoint.show = !equal;
1468 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1469 anyhow::Ok(())
1470 })??;
1471 }
1472
1473 Ok(())
1474 })
1475 }
1476
1477 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1478 self.entries
1479 .iter_mut()
1480 .enumerate()
1481 .rev()
1482 .find_map(|(ix, entry)| {
1483 if let AgentThreadEntry::UserMessage(message) = entry {
1484 Some((ix, message))
1485 } else {
1486 None
1487 }
1488 })
1489 }
1490
1491 fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1492 self.entries.iter().find_map(|entry| {
1493 if let AgentThreadEntry::UserMessage(message) = entry {
1494 if message.id.as_ref() == Some(id) {
1495 Some(message)
1496 } else {
1497 None
1498 }
1499 } else {
1500 None
1501 }
1502 })
1503 }
1504
1505 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1506 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1507 if let AgentThreadEntry::UserMessage(message) = entry {
1508 if message.id.as_ref() == Some(id) {
1509 Some((ix, message))
1510 } else {
1511 None
1512 }
1513 } else {
1514 None
1515 }
1516 })
1517 }
1518
1519 pub fn read_text_file(
1520 &self,
1521 path: PathBuf,
1522 line: Option<u32>,
1523 limit: Option<u32>,
1524 reuse_shared_snapshot: bool,
1525 cx: &mut Context<Self>,
1526 ) -> Task<Result<String>> {
1527 let project = self.project.clone();
1528 let action_log = self.action_log.clone();
1529 cx.spawn(async move |this, cx| {
1530 let load = project.update(cx, |project, cx| {
1531 let path = project
1532 .project_path_for_absolute_path(&path, cx)
1533 .context("invalid path")?;
1534 anyhow::Ok(project.open_buffer(path, cx))
1535 });
1536 let buffer = load??.await?;
1537
1538 let snapshot = if reuse_shared_snapshot {
1539 this.read_with(cx, |this, _| {
1540 this.shared_buffers.get(&buffer.clone()).cloned()
1541 })
1542 .log_err()
1543 .flatten()
1544 } else {
1545 None
1546 };
1547
1548 let snapshot = if let Some(snapshot) = snapshot {
1549 snapshot
1550 } else {
1551 action_log.update(cx, |action_log, cx| {
1552 action_log.buffer_read(buffer.clone(), cx);
1553 })?;
1554 project.update(cx, |project, cx| {
1555 let position = buffer
1556 .read(cx)
1557 .snapshot()
1558 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1559 project.set_agent_location(
1560 Some(AgentLocation {
1561 buffer: buffer.downgrade(),
1562 position,
1563 }),
1564 cx,
1565 );
1566 })?;
1567
1568 buffer.update(cx, |buffer, _| buffer.snapshot())?
1569 };
1570
1571 this.update(cx, |this, _| {
1572 let text = snapshot.text();
1573 this.shared_buffers.insert(buffer.clone(), snapshot);
1574 if line.is_none() && limit.is_none() {
1575 return Ok(text);
1576 }
1577 let limit = limit.unwrap_or(u32::MAX) as usize;
1578 let Some(line) = line else {
1579 return Ok(text.lines().take(limit).collect::<String>());
1580 };
1581
1582 let count = text.lines().count();
1583 if count < line as usize {
1584 anyhow::bail!("There are only {} lines", count);
1585 }
1586 Ok(text
1587 .lines()
1588 .skip(line as usize + 1)
1589 .take(limit)
1590 .collect::<String>())
1591 })?
1592 })
1593 }
1594
1595 pub fn write_text_file(
1596 &self,
1597 path: PathBuf,
1598 content: String,
1599 cx: &mut Context<Self>,
1600 ) -> Task<Result<()>> {
1601 let project = self.project.clone();
1602 let action_log = self.action_log.clone();
1603 cx.spawn(async move |this, cx| {
1604 let load = project.update(cx, |project, cx| {
1605 let path = project
1606 .project_path_for_absolute_path(&path, cx)
1607 .context("invalid path")?;
1608 anyhow::Ok(project.open_buffer(path, cx))
1609 });
1610 let buffer = load??.await?;
1611 let snapshot = this.update(cx, |this, cx| {
1612 this.shared_buffers
1613 .get(&buffer)
1614 .cloned()
1615 .unwrap_or_else(|| buffer.read(cx).snapshot())
1616 })?;
1617 let edits = cx
1618 .background_executor()
1619 .spawn(async move {
1620 let old_text = snapshot.text();
1621 text_diff(old_text.as_str(), &content)
1622 .into_iter()
1623 .map(|(range, replacement)| {
1624 (
1625 snapshot.anchor_after(range.start)
1626 ..snapshot.anchor_before(range.end),
1627 replacement,
1628 )
1629 })
1630 .collect::<Vec<_>>()
1631 })
1632 .await;
1633
1634 project.update(cx, |project, cx| {
1635 project.set_agent_location(
1636 Some(AgentLocation {
1637 buffer: buffer.downgrade(),
1638 position: edits
1639 .last()
1640 .map(|(range, _)| range.end)
1641 .unwrap_or(Anchor::MIN),
1642 }),
1643 cx,
1644 );
1645 })?;
1646
1647 let format_on_save = cx.update(|cx| {
1648 action_log.update(cx, |action_log, cx| {
1649 action_log.buffer_read(buffer.clone(), cx);
1650 });
1651
1652 let format_on_save = buffer.update(cx, |buffer, cx| {
1653 buffer.edit(edits, None, cx);
1654
1655 let settings = language::language_settings::language_settings(
1656 buffer.language().map(|l| l.name()),
1657 buffer.file(),
1658 cx,
1659 );
1660
1661 settings.format_on_save != FormatOnSave::Off
1662 });
1663 action_log.update(cx, |action_log, cx| {
1664 action_log.buffer_edited(buffer.clone(), cx);
1665 });
1666 format_on_save
1667 })?;
1668
1669 if format_on_save {
1670 let format_task = project.update(cx, |project, cx| {
1671 project.format(
1672 HashSet::from_iter([buffer.clone()]),
1673 LspFormatTarget::Buffers,
1674 false,
1675 FormatTrigger::Save,
1676 cx,
1677 )
1678 })?;
1679 format_task.await.log_err();
1680
1681 action_log.update(cx, |action_log, cx| {
1682 action_log.buffer_edited(buffer.clone(), cx);
1683 })?;
1684 }
1685
1686 project
1687 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1688 .await
1689 })
1690 }
1691
1692 pub fn to_markdown(&self, cx: &App) -> String {
1693 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1694 }
1695
1696 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
1697 cx.emit(AcpThreadEvent::LoadError(error));
1698 }
1699}
1700
1701fn markdown_for_raw_output(
1702 raw_output: &serde_json::Value,
1703 language_registry: &Arc<LanguageRegistry>,
1704 cx: &mut App,
1705) -> Option<Entity<Markdown>> {
1706 match raw_output {
1707 serde_json::Value::Null => None,
1708 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1709 Markdown::new(
1710 value.to_string().into(),
1711 Some(language_registry.clone()),
1712 None,
1713 cx,
1714 )
1715 })),
1716 serde_json::Value::Number(value) => Some(cx.new(|cx| {
1717 Markdown::new(
1718 value.to_string().into(),
1719 Some(language_registry.clone()),
1720 None,
1721 cx,
1722 )
1723 })),
1724 serde_json::Value::String(value) => Some(cx.new(|cx| {
1725 Markdown::new(
1726 value.clone().into(),
1727 Some(language_registry.clone()),
1728 None,
1729 cx,
1730 )
1731 })),
1732 value => Some(cx.new(|cx| {
1733 Markdown::new(
1734 format!("```json\n{}\n```", value).into(),
1735 Some(language_registry.clone()),
1736 None,
1737 cx,
1738 )
1739 })),
1740 }
1741}
1742
1743#[cfg(test)]
1744mod tests {
1745 use super::*;
1746 use anyhow::anyhow;
1747 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1748 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1749 use indoc::indoc;
1750 use project::{FakeFs, Fs};
1751 use rand::Rng as _;
1752 use serde_json::json;
1753 use settings::SettingsStore;
1754 use smol::stream::StreamExt as _;
1755 use std::{
1756 any::Any,
1757 cell::RefCell,
1758 path::Path,
1759 rc::Rc,
1760 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1761 time::Duration,
1762 };
1763 use util::path;
1764
1765 fn init_test(cx: &mut TestAppContext) {
1766 env_logger::try_init().ok();
1767 cx.update(|cx| {
1768 let settings_store = SettingsStore::test(cx);
1769 cx.set_global(settings_store);
1770 Project::init_settings(cx);
1771 language::init(cx);
1772 });
1773 }
1774
1775 #[gpui::test]
1776 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1777 init_test(cx);
1778
1779 let fs = FakeFs::new(cx.executor());
1780 let project = Project::test(fs, [], cx).await;
1781 let connection = Rc::new(FakeAgentConnection::new());
1782 let thread = cx
1783 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1784 .await
1785 .unwrap();
1786
1787 // Test creating a new user message
1788 thread.update(cx, |thread, cx| {
1789 thread.push_user_content_block(
1790 None,
1791 acp::ContentBlock::Text(acp::TextContent {
1792 annotations: None,
1793 text: "Hello, ".to_string(),
1794 }),
1795 cx,
1796 );
1797 });
1798
1799 thread.update(cx, |thread, cx| {
1800 assert_eq!(thread.entries.len(), 1);
1801 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1802 assert_eq!(user_msg.id, None);
1803 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1804 } else {
1805 panic!("Expected UserMessage");
1806 }
1807 });
1808
1809 // Test appending to existing user message
1810 let message_1_id = UserMessageId::new();
1811 thread.update(cx, |thread, cx| {
1812 thread.push_user_content_block(
1813 Some(message_1_id.clone()),
1814 acp::ContentBlock::Text(acp::TextContent {
1815 annotations: None,
1816 text: "world!".to_string(),
1817 }),
1818 cx,
1819 );
1820 });
1821
1822 thread.update(cx, |thread, cx| {
1823 assert_eq!(thread.entries.len(), 1);
1824 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1825 assert_eq!(user_msg.id, Some(message_1_id));
1826 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1827 } else {
1828 panic!("Expected UserMessage");
1829 }
1830 });
1831
1832 // Test creating new user message after assistant message
1833 thread.update(cx, |thread, cx| {
1834 thread.push_assistant_content_block(
1835 acp::ContentBlock::Text(acp::TextContent {
1836 annotations: None,
1837 text: "Assistant response".to_string(),
1838 }),
1839 false,
1840 cx,
1841 );
1842 });
1843
1844 let message_2_id = UserMessageId::new();
1845 thread.update(cx, |thread, cx| {
1846 thread.push_user_content_block(
1847 Some(message_2_id.clone()),
1848 acp::ContentBlock::Text(acp::TextContent {
1849 annotations: None,
1850 text: "New user message".to_string(),
1851 }),
1852 cx,
1853 );
1854 });
1855
1856 thread.update(cx, |thread, cx| {
1857 assert_eq!(thread.entries.len(), 3);
1858 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1859 assert_eq!(user_msg.id, Some(message_2_id));
1860 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1861 } else {
1862 panic!("Expected UserMessage at index 2");
1863 }
1864 });
1865 }
1866
1867 #[gpui::test]
1868 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1869 init_test(cx);
1870
1871 let fs = FakeFs::new(cx.executor());
1872 let project = Project::test(fs, [], cx).await;
1873 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1874 |_, thread, mut cx| {
1875 async move {
1876 thread.update(&mut cx, |thread, cx| {
1877 thread
1878 .handle_session_update(
1879 acp::SessionUpdate::AgentThoughtChunk {
1880 content: "Thinking ".into(),
1881 },
1882 cx,
1883 )
1884 .unwrap();
1885 thread
1886 .handle_session_update(
1887 acp::SessionUpdate::AgentThoughtChunk {
1888 content: "hard!".into(),
1889 },
1890 cx,
1891 )
1892 .unwrap();
1893 })?;
1894 Ok(acp::PromptResponse {
1895 stop_reason: acp::StopReason::EndTurn,
1896 })
1897 }
1898 .boxed_local()
1899 },
1900 ));
1901
1902 let thread = cx
1903 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1904 .await
1905 .unwrap();
1906
1907 thread
1908 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1909 .await
1910 .unwrap();
1911
1912 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1913 assert_eq!(
1914 output,
1915 indoc! {r#"
1916 ## User
1917
1918 Hello from Zed!
1919
1920 ## Assistant
1921
1922 <thinking>
1923 Thinking hard!
1924 </thinking>
1925
1926 "#}
1927 );
1928 }
1929
1930 #[gpui::test]
1931 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1932 init_test(cx);
1933
1934 let fs = FakeFs::new(cx.executor());
1935 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1936 .await;
1937 let project = Project::test(fs.clone(), [], cx).await;
1938 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1939 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1940 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1941 move |_, thread, mut cx| {
1942 let read_file_tx = read_file_tx.clone();
1943 async move {
1944 let content = thread
1945 .update(&mut cx, |thread, cx| {
1946 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1947 })
1948 .unwrap()
1949 .await
1950 .unwrap();
1951 assert_eq!(content, "one\ntwo\nthree\n");
1952 read_file_tx.take().unwrap().send(()).unwrap();
1953 thread
1954 .update(&mut cx, |thread, cx| {
1955 thread.write_text_file(
1956 path!("/tmp/foo").into(),
1957 "one\ntwo\nthree\nfour\nfive\n".to_string(),
1958 cx,
1959 )
1960 })
1961 .unwrap()
1962 .await
1963 .unwrap();
1964 Ok(acp::PromptResponse {
1965 stop_reason: acp::StopReason::EndTurn,
1966 })
1967 }
1968 .boxed_local()
1969 },
1970 ));
1971
1972 let (worktree, pathbuf) = project
1973 .update(cx, |project, cx| {
1974 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1975 })
1976 .await
1977 .unwrap();
1978 let buffer = project
1979 .update(cx, |project, cx| {
1980 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1981 })
1982 .await
1983 .unwrap();
1984
1985 let thread = cx
1986 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1987 .await
1988 .unwrap();
1989
1990 let request = thread.update(cx, |thread, cx| {
1991 thread.send_raw("Extend the count in /tmp/foo", cx)
1992 });
1993 read_file_rx.await.ok();
1994 buffer.update(cx, |buffer, cx| {
1995 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1996 });
1997 cx.run_until_parked();
1998 assert_eq!(
1999 buffer.read_with(cx, |buffer, _| buffer.text()),
2000 "zero\none\ntwo\nthree\nfour\nfive\n"
2001 );
2002 assert_eq!(
2003 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2004 "zero\none\ntwo\nthree\nfour\nfive\n"
2005 );
2006 request.await.unwrap();
2007 }
2008
2009 #[gpui::test]
2010 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2011 init_test(cx);
2012
2013 let fs = FakeFs::new(cx.executor());
2014 let project = Project::test(fs, [], cx).await;
2015 let id = acp::ToolCallId("test".into());
2016
2017 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2018 let id = id.clone();
2019 move |_, thread, mut cx| {
2020 let id = id.clone();
2021 async move {
2022 thread
2023 .update(&mut cx, |thread, cx| {
2024 thread.handle_session_update(
2025 acp::SessionUpdate::ToolCall(acp::ToolCall {
2026 id: id.clone(),
2027 title: "Label".into(),
2028 kind: acp::ToolKind::Fetch,
2029 status: acp::ToolCallStatus::InProgress,
2030 content: vec![],
2031 locations: vec![],
2032 raw_input: None,
2033 raw_output: None,
2034 }),
2035 cx,
2036 )
2037 })
2038 .unwrap()
2039 .unwrap();
2040 Ok(acp::PromptResponse {
2041 stop_reason: acp::StopReason::EndTurn,
2042 })
2043 }
2044 .boxed_local()
2045 }
2046 }));
2047
2048 let thread = cx
2049 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2050 .await
2051 .unwrap();
2052
2053 let request = thread.update(cx, |thread, cx| {
2054 thread.send_raw("Fetch https://example.com", cx)
2055 });
2056
2057 run_until_first_tool_call(&thread, cx).await;
2058
2059 thread.read_with(cx, |thread, _| {
2060 assert!(matches!(
2061 thread.entries[1],
2062 AgentThreadEntry::ToolCall(ToolCall {
2063 status: ToolCallStatus::InProgress,
2064 ..
2065 })
2066 ));
2067 });
2068
2069 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2070
2071 thread.read_with(cx, |thread, _| {
2072 assert!(matches!(
2073 &thread.entries[1],
2074 AgentThreadEntry::ToolCall(ToolCall {
2075 status: ToolCallStatus::Canceled,
2076 ..
2077 })
2078 ));
2079 });
2080
2081 thread
2082 .update(cx, |thread, cx| {
2083 thread.handle_session_update(
2084 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2085 id,
2086 fields: acp::ToolCallUpdateFields {
2087 status: Some(acp::ToolCallStatus::Completed),
2088 ..Default::default()
2089 },
2090 }),
2091 cx,
2092 )
2093 })
2094 .unwrap();
2095
2096 request.await.unwrap();
2097
2098 thread.read_with(cx, |thread, _| {
2099 assert!(matches!(
2100 thread.entries[1],
2101 AgentThreadEntry::ToolCall(ToolCall {
2102 status: ToolCallStatus::Completed,
2103 ..
2104 })
2105 ));
2106 });
2107 }
2108
2109 #[gpui::test]
2110 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2111 init_test(cx);
2112 let fs = FakeFs::new(cx.background_executor.clone());
2113 fs.insert_tree(path!("/test"), json!({})).await;
2114 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2115
2116 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2117 move |_, thread, mut cx| {
2118 async move {
2119 thread
2120 .update(&mut cx, |thread, cx| {
2121 thread.handle_session_update(
2122 acp::SessionUpdate::ToolCall(acp::ToolCall {
2123 id: acp::ToolCallId("test".into()),
2124 title: "Label".into(),
2125 kind: acp::ToolKind::Edit,
2126 status: acp::ToolCallStatus::Completed,
2127 content: vec![acp::ToolCallContent::Diff {
2128 diff: acp::Diff {
2129 path: "/test/test.txt".into(),
2130 old_text: None,
2131 new_text: "foo".into(),
2132 },
2133 }],
2134 locations: vec![],
2135 raw_input: None,
2136 raw_output: None,
2137 }),
2138 cx,
2139 )
2140 })
2141 .unwrap()
2142 .unwrap();
2143 Ok(acp::PromptResponse {
2144 stop_reason: acp::StopReason::EndTurn,
2145 })
2146 }
2147 .boxed_local()
2148 }
2149 }));
2150
2151 let thread = cx
2152 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2153 .await
2154 .unwrap();
2155
2156 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2157 .await
2158 .unwrap();
2159
2160 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2161 }
2162
2163 #[gpui::test(iterations = 10)]
2164 async fn test_checkpoints(cx: &mut TestAppContext) {
2165 init_test(cx);
2166 let fs = FakeFs::new(cx.background_executor.clone());
2167 fs.insert_tree(
2168 path!("/test"),
2169 json!({
2170 ".git": {}
2171 }),
2172 )
2173 .await;
2174 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2175
2176 let simulate_changes = Arc::new(AtomicBool::new(true));
2177 let next_filename = Arc::new(AtomicUsize::new(0));
2178 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2179 let simulate_changes = simulate_changes.clone();
2180 let next_filename = next_filename.clone();
2181 let fs = fs.clone();
2182 move |request, thread, mut cx| {
2183 let fs = fs.clone();
2184 let simulate_changes = simulate_changes.clone();
2185 let next_filename = next_filename.clone();
2186 async move {
2187 if simulate_changes.load(SeqCst) {
2188 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2189 fs.write(Path::new(&filename), b"").await?;
2190 }
2191
2192 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2193 panic!("expected text content block");
2194 };
2195 thread.update(&mut cx, |thread, cx| {
2196 thread
2197 .handle_session_update(
2198 acp::SessionUpdate::AgentMessageChunk {
2199 content: content.text.to_uppercase().into(),
2200 },
2201 cx,
2202 )
2203 .unwrap();
2204 })?;
2205 Ok(acp::PromptResponse {
2206 stop_reason: acp::StopReason::EndTurn,
2207 })
2208 }
2209 .boxed_local()
2210 }
2211 }));
2212 let thread = cx
2213 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2214 .await
2215 .unwrap();
2216
2217 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2218 .await
2219 .unwrap();
2220 thread.read_with(cx, |thread, cx| {
2221 assert_eq!(
2222 thread.to_markdown(cx),
2223 indoc! {"
2224 ## User (checkpoint)
2225
2226 Lorem
2227
2228 ## Assistant
2229
2230 LOREM
2231
2232 "}
2233 );
2234 });
2235 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2236
2237 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2238 .await
2239 .unwrap();
2240 thread.read_with(cx, |thread, cx| {
2241 assert_eq!(
2242 thread.to_markdown(cx),
2243 indoc! {"
2244 ## User (checkpoint)
2245
2246 Lorem
2247
2248 ## Assistant
2249
2250 LOREM
2251
2252 ## User (checkpoint)
2253
2254 ipsum
2255
2256 ## Assistant
2257
2258 IPSUM
2259
2260 "}
2261 );
2262 });
2263 assert_eq!(
2264 fs.files(),
2265 vec![
2266 Path::new(path!("/test/file-0")),
2267 Path::new(path!("/test/file-1"))
2268 ]
2269 );
2270
2271 // Checkpoint isn't stored when there are no changes.
2272 simulate_changes.store(false, SeqCst);
2273 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2274 .await
2275 .unwrap();
2276 thread.read_with(cx, |thread, cx| {
2277 assert_eq!(
2278 thread.to_markdown(cx),
2279 indoc! {"
2280 ## User (checkpoint)
2281
2282 Lorem
2283
2284 ## Assistant
2285
2286 LOREM
2287
2288 ## User (checkpoint)
2289
2290 ipsum
2291
2292 ## Assistant
2293
2294 IPSUM
2295
2296 ## User
2297
2298 dolor
2299
2300 ## Assistant
2301
2302 DOLOR
2303
2304 "}
2305 );
2306 });
2307 assert_eq!(
2308 fs.files(),
2309 vec![
2310 Path::new(path!("/test/file-0")),
2311 Path::new(path!("/test/file-1"))
2312 ]
2313 );
2314
2315 // Rewinding the conversation truncates the history and restores the checkpoint.
2316 thread
2317 .update(cx, |thread, cx| {
2318 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2319 panic!("unexpected entries {:?}", thread.entries)
2320 };
2321 thread.rewind(message.id.clone().unwrap(), cx)
2322 })
2323 .await
2324 .unwrap();
2325 thread.read_with(cx, |thread, cx| {
2326 assert_eq!(
2327 thread.to_markdown(cx),
2328 indoc! {"
2329 ## User (checkpoint)
2330
2331 Lorem
2332
2333 ## Assistant
2334
2335 LOREM
2336
2337 "}
2338 );
2339 });
2340 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2341 }
2342
2343 async fn run_until_first_tool_call(
2344 thread: &Entity<AcpThread>,
2345 cx: &mut TestAppContext,
2346 ) -> usize {
2347 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2348
2349 let subscription = cx.update(|cx| {
2350 cx.subscribe(thread, move |thread, _, cx| {
2351 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2352 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2353 return tx.try_send(ix).unwrap();
2354 }
2355 }
2356 })
2357 });
2358
2359 select! {
2360 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2361 panic!("Timeout waiting for tool call")
2362 }
2363 ix = rx.next().fuse() => {
2364 drop(subscription);
2365 ix.unwrap()
2366 }
2367 }
2368 }
2369
2370 #[derive(Clone, Default)]
2371 struct FakeAgentConnection {
2372 auth_methods: Vec<acp::AuthMethod>,
2373 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2374 on_user_message: Option<
2375 Rc<
2376 dyn Fn(
2377 acp::PromptRequest,
2378 WeakEntity<AcpThread>,
2379 AsyncApp,
2380 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2381 + 'static,
2382 >,
2383 >,
2384 }
2385
2386 impl FakeAgentConnection {
2387 fn new() -> Self {
2388 Self {
2389 auth_methods: Vec::new(),
2390 on_user_message: None,
2391 sessions: Arc::default(),
2392 }
2393 }
2394
2395 #[expect(unused)]
2396 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2397 self.auth_methods = auth_methods;
2398 self
2399 }
2400
2401 fn on_user_message(
2402 mut self,
2403 handler: impl Fn(
2404 acp::PromptRequest,
2405 WeakEntity<AcpThread>,
2406 AsyncApp,
2407 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2408 + 'static,
2409 ) -> Self {
2410 self.on_user_message.replace(Rc::new(handler));
2411 self
2412 }
2413 }
2414
2415 impl AgentConnection for FakeAgentConnection {
2416 fn auth_methods(&self) -> &[acp::AuthMethod] {
2417 &self.auth_methods
2418 }
2419
2420 fn new_thread(
2421 self: Rc<Self>,
2422 project: Entity<Project>,
2423 _cwd: &Path,
2424 cx: &mut App,
2425 ) -> Task<gpui::Result<Entity<AcpThread>>> {
2426 let session_id = acp::SessionId(
2427 rand::thread_rng()
2428 .sample_iter(&rand::distributions::Alphanumeric)
2429 .take(7)
2430 .map(char::from)
2431 .collect::<String>()
2432 .into(),
2433 );
2434 let action_log = cx.new(|_| ActionLog::new(project.clone()));
2435 let thread = cx.new(|_cx| {
2436 AcpThread::new(
2437 "Test",
2438 self.clone(),
2439 project,
2440 action_log,
2441 session_id.clone(),
2442 )
2443 });
2444 self.sessions.lock().insert(session_id, thread.downgrade());
2445 Task::ready(Ok(thread))
2446 }
2447
2448 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2449 if self.auth_methods().iter().any(|m| m.id == method) {
2450 Task::ready(Ok(()))
2451 } else {
2452 Task::ready(Err(anyhow!("Invalid Auth Method")))
2453 }
2454 }
2455
2456 fn prompt(
2457 &self,
2458 _id: Option<UserMessageId>,
2459 params: acp::PromptRequest,
2460 cx: &mut App,
2461 ) -> Task<gpui::Result<acp::PromptResponse>> {
2462 let sessions = self.sessions.lock();
2463 let thread = sessions.get(¶ms.session_id).unwrap();
2464 if let Some(handler) = &self.on_user_message {
2465 let handler = handler.clone();
2466 let thread = thread.clone();
2467 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2468 } else {
2469 Task::ready(Ok(acp::PromptResponse {
2470 stop_reason: acp::StopReason::EndTurn,
2471 }))
2472 }
2473 }
2474
2475 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2476 let sessions = self.sessions.lock();
2477 let thread = sessions.get(session_id).unwrap().clone();
2478
2479 cx.spawn(async move |cx| {
2480 thread
2481 .update(cx, |thread, cx| thread.cancel(cx))
2482 .unwrap()
2483 .await
2484 })
2485 .detach();
2486 }
2487
2488 fn session_editor(
2489 &self,
2490 session_id: &acp::SessionId,
2491 _cx: &mut App,
2492 ) -> Option<Rc<dyn AgentSessionEditor>> {
2493 Some(Rc::new(FakeAgentSessionEditor {
2494 _session_id: session_id.clone(),
2495 }))
2496 }
2497
2498 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2499 self
2500 }
2501 }
2502
2503 struct FakeAgentSessionEditor {
2504 _session_id: acp::SessionId,
2505 }
2506
2507 impl AgentSessionEditor for FakeAgentSessionEditor {
2508 fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2509 Task::ready(Ok(()))
2510 }
2511 }
2512}