1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use collections::HashSet;
7pub use connection::*;
8pub use diff::*;
9use language::language_settings::FormatOnSave;
10pub use mention::*;
11use project::lsp_store::{FormatTrigger, LspFormatTarget};
12use serde::{Deserialize, Serialize};
13pub use terminal::*;
14
15use action_log::ActionLog;
16use agent_client_protocol as acp;
17use anyhow::{Context as _, Result, anyhow};
18use editor::Bias;
19use futures::{FutureExt, channel::oneshot, future::BoxFuture};
20use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
21use itertools::Itertools;
22use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
23use markdown::Markdown;
24use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
25use std::collections::HashMap;
26use std::error::Error;
27use std::fmt::{Formatter, Write};
28use std::ops::Range;
29use std::process::ExitStatus;
30use std::rc::Rc;
31use std::time::{Duration, Instant};
32use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
33use ui::App;
34use util::ResultExt;
35
36#[derive(Debug)]
37pub struct UserMessage {
38 pub id: Option<UserMessageId>,
39 pub content: ContentBlock,
40 pub chunks: Vec<acp::ContentBlock>,
41 pub checkpoint: Option<Checkpoint>,
42}
43
44#[derive(Debug)]
45pub struct Checkpoint {
46 git_checkpoint: GitStoreCheckpoint,
47 pub show: bool,
48}
49
50impl UserMessage {
51 fn to_markdown(&self, cx: &App) -> String {
52 let mut markdown = String::new();
53 if self
54 .checkpoint
55 .as_ref()
56 .is_some_and(|checkpoint| checkpoint.show)
57 {
58 writeln!(markdown, "## User (checkpoint)").unwrap();
59 } else {
60 writeln!(markdown, "## User").unwrap();
61 }
62 writeln!(markdown).unwrap();
63 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
64 writeln!(markdown).unwrap();
65 markdown
66 }
67}
68
69#[derive(Debug, PartialEq)]
70pub struct AssistantMessage {
71 pub chunks: Vec<AssistantMessageChunk>,
72}
73
74impl AssistantMessage {
75 pub fn to_markdown(&self, cx: &App) -> String {
76 format!(
77 "## Assistant\n\n{}\n\n",
78 self.chunks
79 .iter()
80 .map(|chunk| chunk.to_markdown(cx))
81 .join("\n\n")
82 )
83 }
84}
85
86#[derive(Debug, PartialEq)]
87pub enum AssistantMessageChunk {
88 Message { block: ContentBlock },
89 Thought { block: ContentBlock },
90}
91
92impl AssistantMessageChunk {
93 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
94 Self::Message {
95 block: ContentBlock::new(chunk.into(), language_registry, cx),
96 }
97 }
98
99 fn to_markdown(&self, cx: &App) -> String {
100 match self {
101 Self::Message { block } => block.to_markdown(cx).to_string(),
102 Self::Thought { block } => {
103 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
104 }
105 }
106 }
107}
108
109#[derive(Debug)]
110pub enum AgentThreadEntry {
111 UserMessage(UserMessage),
112 AssistantMessage(AssistantMessage),
113 ToolCall(ToolCall),
114}
115
116impl AgentThreadEntry {
117 pub fn to_markdown(&self, cx: &App) -> String {
118 match self {
119 Self::UserMessage(message) => message.to_markdown(cx),
120 Self::AssistantMessage(message) => message.to_markdown(cx),
121 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
122 }
123 }
124
125 pub fn user_message(&self) -> Option<&UserMessage> {
126 if let AgentThreadEntry::UserMessage(message) = self {
127 Some(message)
128 } else {
129 None
130 }
131 }
132
133 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
134 if let AgentThreadEntry::ToolCall(call) = self {
135 itertools::Either::Left(call.diffs())
136 } else {
137 itertools::Either::Right(std::iter::empty())
138 }
139 }
140
141 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
142 if let AgentThreadEntry::ToolCall(call) = self {
143 itertools::Either::Left(call.terminals())
144 } else {
145 itertools::Either::Right(std::iter::empty())
146 }
147 }
148
149 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
150 if let AgentThreadEntry::ToolCall(ToolCall {
151 locations,
152 resolved_locations,
153 ..
154 }) = self
155 {
156 Some((
157 locations.get(ix)?.clone(),
158 resolved_locations.get(ix)?.clone()?,
159 ))
160 } else {
161 None
162 }
163 }
164}
165
166#[derive(Debug)]
167pub struct ToolCall {
168 pub id: acp::ToolCallId,
169 pub label: Entity<Markdown>,
170 pub kind: acp::ToolKind,
171 pub content: Vec<ToolCallContent>,
172 pub status: ToolCallStatus,
173 pub locations: Vec<acp::ToolCallLocation>,
174 pub resolved_locations: Vec<Option<AgentLocation>>,
175 pub raw_input: Option<serde_json::Value>,
176 pub raw_output: Option<serde_json::Value>,
177}
178
179impl ToolCall {
180 fn from_acp(
181 tool_call: acp::ToolCall,
182 status: ToolCallStatus,
183 language_registry: Arc<LanguageRegistry>,
184 cx: &mut App,
185 ) -> Self {
186 Self {
187 id: tool_call.id,
188 label: cx.new(|cx| {
189 Markdown::new(
190 tool_call.title.into(),
191 Some(language_registry.clone()),
192 None,
193 cx,
194 )
195 }),
196 kind: tool_call.kind,
197 content: tool_call
198 .content
199 .into_iter()
200 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
201 .collect(),
202 locations: tool_call.locations,
203 resolved_locations: Vec::default(),
204 status,
205 raw_input: tool_call.raw_input,
206 raw_output: tool_call.raw_output,
207 }
208 }
209
210 fn update_fields(
211 &mut self,
212 fields: acp::ToolCallUpdateFields,
213 language_registry: Arc<LanguageRegistry>,
214 cx: &mut App,
215 ) {
216 let acp::ToolCallUpdateFields {
217 kind,
218 status,
219 title,
220 content,
221 locations,
222 raw_input,
223 raw_output,
224 } = fields;
225
226 if let Some(kind) = kind {
227 self.kind = kind;
228 }
229
230 if let Some(status) = status {
231 self.status = status.into();
232 }
233
234 if let Some(title) = title {
235 self.label.update(cx, |label, cx| {
236 label.replace(title, cx);
237 });
238 }
239
240 if let Some(content) = content {
241 self.content = content
242 .into_iter()
243 .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
244 .collect();
245 }
246
247 if let Some(locations) = locations {
248 self.locations = locations;
249 }
250
251 if let Some(raw_input) = raw_input {
252 self.raw_input = Some(raw_input);
253 }
254
255 if let Some(raw_output) = raw_output {
256 if self.content.is_empty()
257 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
258 {
259 self.content
260 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
261 markdown,
262 }));
263 }
264 self.raw_output = Some(raw_output);
265 }
266 }
267
268 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
269 self.content.iter().filter_map(|content| match content {
270 ToolCallContent::Diff(diff) => Some(diff),
271 ToolCallContent::ContentBlock(_) => None,
272 ToolCallContent::Terminal(_) => None,
273 })
274 }
275
276 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
277 self.content.iter().filter_map(|content| match content {
278 ToolCallContent::Terminal(terminal) => Some(terminal),
279 ToolCallContent::ContentBlock(_) => None,
280 ToolCallContent::Diff(_) => None,
281 })
282 }
283
284 fn to_markdown(&self, cx: &App) -> String {
285 let mut markdown = format!(
286 "**Tool Call: {}**\nStatus: {}\n\n",
287 self.label.read(cx).source(),
288 self.status
289 );
290 for content in &self.content {
291 markdown.push_str(content.to_markdown(cx).as_str());
292 markdown.push_str("\n\n");
293 }
294 markdown
295 }
296
297 async fn resolve_location(
298 location: acp::ToolCallLocation,
299 project: WeakEntity<Project>,
300 cx: &mut AsyncApp,
301 ) -> Option<AgentLocation> {
302 let buffer = project
303 .update(cx, |project, cx| {
304 if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
305 Some(project.open_buffer(path, cx))
306 } else {
307 None
308 }
309 })
310 .ok()??;
311 let buffer = buffer.await.log_err()?;
312 let position = buffer
313 .update(cx, |buffer, _| {
314 if let Some(row) = location.line {
315 let snapshot = buffer.snapshot();
316 let column = snapshot.indent_size_for_line(row).len;
317 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
318 snapshot.anchor_before(point)
319 } else {
320 Anchor::MIN
321 }
322 })
323 .ok()?;
324
325 Some(AgentLocation {
326 buffer: buffer.downgrade(),
327 position,
328 })
329 }
330
331 fn resolve_locations(
332 &self,
333 project: Entity<Project>,
334 cx: &mut App,
335 ) -> Task<Vec<Option<AgentLocation>>> {
336 let locations = self.locations.clone();
337 project.update(cx, |_, cx| {
338 cx.spawn(async move |project, cx| {
339 let mut new_locations = Vec::new();
340 for location in locations {
341 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
342 }
343 new_locations
344 })
345 })
346 }
347}
348
349#[derive(Debug)]
350pub enum ToolCallStatus {
351 /// The tool call hasn't started running yet, but we start showing it to
352 /// the user.
353 Pending,
354 /// The tool call is waiting for confirmation from the user.
355 WaitingForConfirmation {
356 options: Vec<acp::PermissionOption>,
357 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
358 },
359 /// The tool call is currently running.
360 InProgress,
361 /// The tool call completed successfully.
362 Completed,
363 /// The tool call failed.
364 Failed,
365 /// The user rejected the tool call.
366 Rejected,
367 /// The user canceled generation so the tool call was canceled.
368 Canceled,
369}
370
371impl From<acp::ToolCallStatus> for ToolCallStatus {
372 fn from(status: acp::ToolCallStatus) -> Self {
373 match status {
374 acp::ToolCallStatus::Pending => Self::Pending,
375 acp::ToolCallStatus::InProgress => Self::InProgress,
376 acp::ToolCallStatus::Completed => Self::Completed,
377 acp::ToolCallStatus::Failed => Self::Failed,
378 }
379 }
380}
381
382impl Display for ToolCallStatus {
383 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
384 write!(
385 f,
386 "{}",
387 match self {
388 ToolCallStatus::Pending => "Pending",
389 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
390 ToolCallStatus::InProgress => "In Progress",
391 ToolCallStatus::Completed => "Completed",
392 ToolCallStatus::Failed => "Failed",
393 ToolCallStatus::Rejected => "Rejected",
394 ToolCallStatus::Canceled => "Canceled",
395 }
396 )
397 }
398}
399
400#[derive(Debug, PartialEq, Clone)]
401pub enum ContentBlock {
402 Empty,
403 Markdown { markdown: Entity<Markdown> },
404 ResourceLink { resource_link: acp::ResourceLink },
405}
406
407impl ContentBlock {
408 pub fn new(
409 block: acp::ContentBlock,
410 language_registry: &Arc<LanguageRegistry>,
411 cx: &mut App,
412 ) -> Self {
413 let mut this = Self::Empty;
414 this.append(block, language_registry, cx);
415 this
416 }
417
418 pub fn new_combined(
419 blocks: impl IntoIterator<Item = acp::ContentBlock>,
420 language_registry: Arc<LanguageRegistry>,
421 cx: &mut App,
422 ) -> Self {
423 let mut this = Self::Empty;
424 for block in blocks {
425 this.append(block, &language_registry, cx);
426 }
427 this
428 }
429
430 pub fn append(
431 &mut self,
432 block: acp::ContentBlock,
433 language_registry: &Arc<LanguageRegistry>,
434 cx: &mut App,
435 ) {
436 if matches!(self, ContentBlock::Empty)
437 && let acp::ContentBlock::ResourceLink(resource_link) = block
438 {
439 *self = ContentBlock::ResourceLink { resource_link };
440 return;
441 }
442
443 let new_content = self.block_string_contents(block);
444
445 match self {
446 ContentBlock::Empty => {
447 *self = Self::create_markdown_block(new_content, language_registry, cx);
448 }
449 ContentBlock::Markdown { markdown } => {
450 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
451 }
452 ContentBlock::ResourceLink { resource_link } => {
453 let existing_content = Self::resource_link_md(&resource_link.uri);
454 let combined = format!("{}\n{}", existing_content, new_content);
455
456 *self = Self::create_markdown_block(combined, language_registry, cx);
457 }
458 }
459 }
460
461 fn create_markdown_block(
462 content: String,
463 language_registry: &Arc<LanguageRegistry>,
464 cx: &mut App,
465 ) -> ContentBlock {
466 ContentBlock::Markdown {
467 markdown: cx
468 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
469 }
470 }
471
472 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
473 match block {
474 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
475 acp::ContentBlock::ResourceLink(resource_link) => {
476 Self::resource_link_md(&resource_link.uri)
477 }
478 acp::ContentBlock::Resource(acp::EmbeddedResource {
479 resource:
480 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
481 uri,
482 ..
483 }),
484 ..
485 }) => Self::resource_link_md(&uri),
486 acp::ContentBlock::Image(image) => Self::image_md(&image),
487 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
488 }
489 }
490
491 fn resource_link_md(uri: &str) -> String {
492 if let Some(uri) = MentionUri::parse(uri).log_err() {
493 uri.as_link().to_string()
494 } else {
495 uri.to_string()
496 }
497 }
498
499 fn image_md(_image: &acp::ImageContent) -> String {
500 "`Image`".into()
501 }
502
503 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
504 match self {
505 ContentBlock::Empty => "",
506 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
507 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
508 }
509 }
510
511 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
512 match self {
513 ContentBlock::Empty => None,
514 ContentBlock::Markdown { markdown } => Some(markdown),
515 ContentBlock::ResourceLink { .. } => None,
516 }
517 }
518
519 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
520 match self {
521 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
522 _ => None,
523 }
524 }
525}
526
527#[derive(Debug)]
528pub enum ToolCallContent {
529 ContentBlock(ContentBlock),
530 Diff(Entity<Diff>),
531 Terminal(Entity<Terminal>),
532}
533
534impl ToolCallContent {
535 pub fn from_acp(
536 content: acp::ToolCallContent,
537 language_registry: Arc<LanguageRegistry>,
538 cx: &mut App,
539 ) -> Self {
540 match content {
541 acp::ToolCallContent::Content { content } => {
542 Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
543 }
544 acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| {
545 Diff::finalized(
546 diff.path,
547 diff.old_text,
548 diff.new_text,
549 language_registry,
550 cx,
551 )
552 })),
553 }
554 }
555
556 pub fn to_markdown(&self, cx: &App) -> String {
557 match self {
558 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
559 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
560 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
561 }
562 }
563}
564
565#[derive(Debug, PartialEq)]
566pub enum ToolCallUpdate {
567 UpdateFields(acp::ToolCallUpdate),
568 UpdateDiff(ToolCallUpdateDiff),
569 UpdateTerminal(ToolCallUpdateTerminal),
570}
571
572impl ToolCallUpdate {
573 fn id(&self) -> &acp::ToolCallId {
574 match self {
575 Self::UpdateFields(update) => &update.id,
576 Self::UpdateDiff(diff) => &diff.id,
577 Self::UpdateTerminal(terminal) => &terminal.id,
578 }
579 }
580}
581
582impl From<acp::ToolCallUpdate> for ToolCallUpdate {
583 fn from(update: acp::ToolCallUpdate) -> Self {
584 Self::UpdateFields(update)
585 }
586}
587
588impl From<ToolCallUpdateDiff> for ToolCallUpdate {
589 fn from(diff: ToolCallUpdateDiff) -> Self {
590 Self::UpdateDiff(diff)
591 }
592}
593
594#[derive(Debug, PartialEq)]
595pub struct ToolCallUpdateDiff {
596 pub id: acp::ToolCallId,
597 pub diff: Entity<Diff>,
598}
599
600impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
601 fn from(terminal: ToolCallUpdateTerminal) -> Self {
602 Self::UpdateTerminal(terminal)
603 }
604}
605
606#[derive(Debug, PartialEq)]
607pub struct ToolCallUpdateTerminal {
608 pub id: acp::ToolCallId,
609 pub terminal: Entity<Terminal>,
610}
611
612#[derive(Debug, Default)]
613pub struct Plan {
614 pub entries: Vec<PlanEntry>,
615}
616
617#[derive(Debug)]
618pub struct PlanStats<'a> {
619 pub in_progress_entry: Option<&'a PlanEntry>,
620 pub pending: u32,
621 pub completed: u32,
622}
623
624impl Plan {
625 pub fn is_empty(&self) -> bool {
626 self.entries.is_empty()
627 }
628
629 pub fn stats(&self) -> PlanStats<'_> {
630 let mut stats = PlanStats {
631 in_progress_entry: None,
632 pending: 0,
633 completed: 0,
634 };
635
636 for entry in &self.entries {
637 match &entry.status {
638 acp::PlanEntryStatus::Pending => {
639 stats.pending += 1;
640 }
641 acp::PlanEntryStatus::InProgress => {
642 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
643 }
644 acp::PlanEntryStatus::Completed => {
645 stats.completed += 1;
646 }
647 }
648 }
649
650 stats
651 }
652}
653
654#[derive(Debug)]
655pub struct PlanEntry {
656 pub content: Entity<Markdown>,
657 pub priority: acp::PlanEntryPriority,
658 pub status: acp::PlanEntryStatus,
659}
660
661impl PlanEntry {
662 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
663 Self {
664 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
665 priority: entry.priority,
666 status: entry.status,
667 }
668 }
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub struct TokenUsage {
673 pub max_tokens: u64,
674 pub used_tokens: u64,
675}
676
677#[derive(Debug, Clone)]
678pub struct RetryStatus {
679 pub last_error: SharedString,
680 pub attempt: usize,
681 pub max_attempts: usize,
682 pub started_at: Instant,
683 pub duration: Duration,
684}
685
686pub struct AcpThread {
687 title: SharedString,
688 entries: Vec<AgentThreadEntry>,
689 plan: Plan,
690 project: Entity<Project>,
691 action_log: Entity<ActionLog>,
692 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
693 send_task: Option<Task<()>>,
694 connection: Rc<dyn AgentConnection>,
695 session_id: acp::SessionId,
696 token_usage: Option<TokenUsage>,
697}
698
699#[derive(Debug)]
700pub enum AcpThreadEvent {
701 NewEntry,
702 TitleUpdated,
703 TokenUsageUpdated,
704 EntryUpdated(usize),
705 EntriesRemoved(Range<usize>),
706 ToolAuthorizationRequired,
707 Retry(RetryStatus),
708 Stopped,
709 Error,
710 ServerExited(ExitStatus),
711}
712
713impl EventEmitter<AcpThreadEvent> for AcpThread {}
714
715#[derive(PartialEq, Eq)]
716pub enum ThreadStatus {
717 Idle,
718 WaitingForToolConfirmation,
719 Generating,
720}
721
722#[derive(Debug, Clone)]
723pub enum LoadError {
724 Unsupported {
725 error_message: SharedString,
726 upgrade_message: SharedString,
727 upgrade_command: String,
728 },
729 Exited(i32),
730 Other(SharedString),
731}
732
733impl Display for LoadError {
734 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
735 match self {
736 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
737 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
738 LoadError::Other(msg) => write!(f, "{}", msg),
739 }
740 }
741}
742
743impl Error for LoadError {}
744
745impl AcpThread {
746 pub fn new(
747 title: impl Into<SharedString>,
748 connection: Rc<dyn AgentConnection>,
749 project: Entity<Project>,
750 action_log: Entity<ActionLog>,
751 session_id: acp::SessionId,
752 ) -> Self {
753 Self {
754 action_log,
755 shared_buffers: Default::default(),
756 entries: Default::default(),
757 plan: Default::default(),
758 title: title.into(),
759 project,
760 send_task: None,
761 connection,
762 session_id,
763 token_usage: None,
764 }
765 }
766
767 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
768 &self.connection
769 }
770
771 pub fn action_log(&self) -> &Entity<ActionLog> {
772 &self.action_log
773 }
774
775 pub fn project(&self) -> &Entity<Project> {
776 &self.project
777 }
778
779 pub fn title(&self) -> SharedString {
780 self.title.clone()
781 }
782
783 pub fn entries(&self) -> &[AgentThreadEntry] {
784 &self.entries
785 }
786
787 pub fn session_id(&self) -> &acp::SessionId {
788 &self.session_id
789 }
790
791 pub fn status(&self) -> ThreadStatus {
792 if self.send_task.is_some() {
793 if self.waiting_for_tool_confirmation() {
794 ThreadStatus::WaitingForToolConfirmation
795 } else {
796 ThreadStatus::Generating
797 }
798 } else {
799 ThreadStatus::Idle
800 }
801 }
802
803 pub fn token_usage(&self) -> Option<&TokenUsage> {
804 self.token_usage.as_ref()
805 }
806
807 pub fn has_pending_edit_tool_calls(&self) -> bool {
808 for entry in self.entries.iter().rev() {
809 match entry {
810 AgentThreadEntry::UserMessage(_) => return false,
811 AgentThreadEntry::ToolCall(
812 call @ ToolCall {
813 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
814 ..
815 },
816 ) if call.diffs().next().is_some() => {
817 return true;
818 }
819 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
820 }
821 }
822
823 false
824 }
825
826 pub fn used_tools_since_last_user_message(&self) -> bool {
827 for entry in self.entries.iter().rev() {
828 match entry {
829 AgentThreadEntry::UserMessage(..) => return false,
830 AgentThreadEntry::AssistantMessage(..) => continue,
831 AgentThreadEntry::ToolCall(..) => return true,
832 }
833 }
834
835 false
836 }
837
838 pub fn handle_session_update(
839 &mut self,
840 update: acp::SessionUpdate,
841 cx: &mut Context<Self>,
842 ) -> Result<(), acp::Error> {
843 match update {
844 acp::SessionUpdate::UserMessageChunk { content } => {
845 self.push_user_content_block(None, content, cx);
846 }
847 acp::SessionUpdate::AgentMessageChunk { content } => {
848 self.push_assistant_content_block(content, false, cx);
849 }
850 acp::SessionUpdate::AgentThoughtChunk { content } => {
851 self.push_assistant_content_block(content, true, cx);
852 }
853 acp::SessionUpdate::ToolCall(tool_call) => {
854 self.upsert_tool_call(tool_call, cx)?;
855 }
856 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
857 self.update_tool_call(tool_call_update, cx)?;
858 }
859 acp::SessionUpdate::Plan(plan) => {
860 self.update_plan(plan, cx);
861 }
862 }
863 Ok(())
864 }
865
866 pub fn push_user_content_block(
867 &mut self,
868 message_id: Option<UserMessageId>,
869 chunk: acp::ContentBlock,
870 cx: &mut Context<Self>,
871 ) {
872 let language_registry = self.project.read(cx).languages().clone();
873 let entries_len = self.entries.len();
874
875 if let Some(last_entry) = self.entries.last_mut()
876 && let AgentThreadEntry::UserMessage(UserMessage {
877 id,
878 content,
879 chunks,
880 ..
881 }) = last_entry
882 {
883 *id = message_id.or(id.take());
884 content.append(chunk.clone(), &language_registry, cx);
885 chunks.push(chunk);
886 let idx = entries_len - 1;
887 cx.emit(AcpThreadEvent::EntryUpdated(idx));
888 } else {
889 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
890 self.push_entry(
891 AgentThreadEntry::UserMessage(UserMessage {
892 id: message_id,
893 content,
894 chunks: vec![chunk],
895 checkpoint: None,
896 }),
897 cx,
898 );
899 }
900 }
901
902 pub fn push_assistant_content_block(
903 &mut self,
904 chunk: acp::ContentBlock,
905 is_thought: bool,
906 cx: &mut Context<Self>,
907 ) {
908 let language_registry = self.project.read(cx).languages().clone();
909 let entries_len = self.entries.len();
910 if let Some(last_entry) = self.entries.last_mut()
911 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
912 {
913 let idx = entries_len - 1;
914 cx.emit(AcpThreadEvent::EntryUpdated(idx));
915 match (chunks.last_mut(), is_thought) {
916 (Some(AssistantMessageChunk::Message { block }), false)
917 | (Some(AssistantMessageChunk::Thought { block }), true) => {
918 block.append(chunk, &language_registry, cx)
919 }
920 _ => {
921 let block = ContentBlock::new(chunk, &language_registry, cx);
922 if is_thought {
923 chunks.push(AssistantMessageChunk::Thought { block })
924 } else {
925 chunks.push(AssistantMessageChunk::Message { block })
926 }
927 }
928 }
929 } else {
930 let block = ContentBlock::new(chunk, &language_registry, cx);
931 let chunk = if is_thought {
932 AssistantMessageChunk::Thought { block }
933 } else {
934 AssistantMessageChunk::Message { block }
935 };
936
937 self.push_entry(
938 AgentThreadEntry::AssistantMessage(AssistantMessage {
939 chunks: vec![chunk],
940 }),
941 cx,
942 );
943 }
944 }
945
946 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
947 self.entries.push(entry);
948 cx.emit(AcpThreadEvent::NewEntry);
949 }
950
951 pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
952 self.title = title;
953 cx.emit(AcpThreadEvent::TitleUpdated);
954 Ok(())
955 }
956
957 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
958 self.token_usage = usage;
959 cx.emit(AcpThreadEvent::TokenUsageUpdated);
960 }
961
962 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
963 cx.emit(AcpThreadEvent::Retry(status));
964 }
965
966 pub fn update_tool_call(
967 &mut self,
968 update: impl Into<ToolCallUpdate>,
969 cx: &mut Context<Self>,
970 ) -> Result<()> {
971 let update = update.into();
972 let languages = self.project.read(cx).languages().clone();
973
974 let (ix, current_call) = self
975 .tool_call_mut(update.id())
976 .context("Tool call not found")?;
977 match update {
978 ToolCallUpdate::UpdateFields(update) => {
979 let location_updated = update.fields.locations.is_some();
980 current_call.update_fields(update.fields, languages, cx);
981 if location_updated {
982 self.resolve_locations(update.id.clone(), cx);
983 }
984 }
985 ToolCallUpdate::UpdateDiff(update) => {
986 current_call.content.clear();
987 current_call
988 .content
989 .push(ToolCallContent::Diff(update.diff));
990 }
991 ToolCallUpdate::UpdateTerminal(update) => {
992 current_call.content.clear();
993 current_call
994 .content
995 .push(ToolCallContent::Terminal(update.terminal));
996 }
997 }
998
999 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1000
1001 Ok(())
1002 }
1003
1004 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1005 pub fn upsert_tool_call(
1006 &mut self,
1007 tool_call: acp::ToolCall,
1008 cx: &mut Context<Self>,
1009 ) -> Result<(), acp::Error> {
1010 let status = tool_call.status.into();
1011 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1012 }
1013
1014 /// Fails if id does not match an existing entry.
1015 pub fn upsert_tool_call_inner(
1016 &mut self,
1017 tool_call_update: acp::ToolCallUpdate,
1018 status: ToolCallStatus,
1019 cx: &mut Context<Self>,
1020 ) -> Result<(), acp::Error> {
1021 let language_registry = self.project.read(cx).languages().clone();
1022 let id = tool_call_update.id.clone();
1023
1024 if let Some((ix, current_call)) = self.tool_call_mut(&id) {
1025 current_call.update_fields(tool_call_update.fields, language_registry, cx);
1026 current_call.status = status;
1027
1028 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1029 } else {
1030 let call =
1031 ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx);
1032 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1033 };
1034
1035 self.resolve_locations(id, cx);
1036 Ok(())
1037 }
1038
1039 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1040 // The tool call we are looking for is typically the last one, or very close to the end.
1041 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1042 self.entries
1043 .iter_mut()
1044 .enumerate()
1045 .rev()
1046 .find_map(|(index, tool_call)| {
1047 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1048 && &tool_call.id == id
1049 {
1050 Some((index, tool_call))
1051 } else {
1052 None
1053 }
1054 })
1055 }
1056
1057 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1058 self.entries
1059 .iter()
1060 .enumerate()
1061 .rev()
1062 .find_map(|(index, tool_call)| {
1063 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1064 && &tool_call.id == id
1065 {
1066 Some((index, tool_call))
1067 } else {
1068 None
1069 }
1070 })
1071 }
1072
1073 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1074 let project = self.project.clone();
1075 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1076 return;
1077 };
1078 let task = tool_call.resolve_locations(project, cx);
1079 cx.spawn(async move |this, cx| {
1080 let resolved_locations = task.await;
1081 this.update(cx, |this, cx| {
1082 let project = this.project.clone();
1083 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1084 return;
1085 };
1086 if let Some(Some(location)) = resolved_locations.last() {
1087 project.update(cx, |project, cx| {
1088 if let Some(agent_location) = project.agent_location() {
1089 let should_ignore = agent_location.buffer == location.buffer
1090 && location
1091 .buffer
1092 .update(cx, |buffer, _| {
1093 let snapshot = buffer.snapshot();
1094 let old_position =
1095 agent_location.position.to_point(&snapshot);
1096 let new_position = location.position.to_point(&snapshot);
1097 // ignore this so that when we get updates from the edit tool
1098 // the position doesn't reset to the startof line
1099 old_position.row == new_position.row
1100 && old_position.column > new_position.column
1101 })
1102 .ok()
1103 .unwrap_or_default();
1104 if !should_ignore {
1105 project.set_agent_location(Some(location.clone()), cx);
1106 }
1107 }
1108 });
1109 }
1110 if tool_call.resolved_locations != resolved_locations {
1111 tool_call.resolved_locations = resolved_locations;
1112 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1113 }
1114 })
1115 })
1116 .detach();
1117 }
1118
1119 pub fn request_tool_call_authorization(
1120 &mut self,
1121 tool_call: acp::ToolCallUpdate,
1122 options: Vec<acp::PermissionOption>,
1123 cx: &mut Context<Self>,
1124 ) -> Result<oneshot::Receiver<acp::PermissionOptionId>, acp::Error> {
1125 let (tx, rx) = oneshot::channel();
1126
1127 let status = ToolCallStatus::WaitingForConfirmation {
1128 options,
1129 respond_tx: tx,
1130 };
1131
1132 self.upsert_tool_call_inner(tool_call, status, cx)?;
1133 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1134 Ok(rx)
1135 }
1136
1137 pub fn authorize_tool_call(
1138 &mut self,
1139 id: acp::ToolCallId,
1140 option_id: acp::PermissionOptionId,
1141 option_kind: acp::PermissionOptionKind,
1142 cx: &mut Context<Self>,
1143 ) {
1144 let Some((ix, call)) = self.tool_call_mut(&id) else {
1145 return;
1146 };
1147
1148 let new_status = match option_kind {
1149 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1150 ToolCallStatus::Rejected
1151 }
1152 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1153 ToolCallStatus::InProgress
1154 }
1155 };
1156
1157 let curr_status = mem::replace(&mut call.status, new_status);
1158
1159 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1160 respond_tx.send(option_id).log_err();
1161 } else if cfg!(debug_assertions) {
1162 panic!("tried to authorize an already authorized tool call");
1163 }
1164
1165 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1166 }
1167
1168 /// Returns true if the last turn is awaiting tool authorization
1169 pub fn waiting_for_tool_confirmation(&self) -> bool {
1170 for entry in self.entries.iter().rev() {
1171 match &entry {
1172 AgentThreadEntry::ToolCall(call) => match call.status {
1173 ToolCallStatus::WaitingForConfirmation { .. } => return true,
1174 ToolCallStatus::Pending
1175 | ToolCallStatus::InProgress
1176 | ToolCallStatus::Completed
1177 | ToolCallStatus::Failed
1178 | ToolCallStatus::Rejected
1179 | ToolCallStatus::Canceled => continue,
1180 },
1181 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1182 // Reached the beginning of the turn
1183 return false;
1184 }
1185 }
1186 }
1187 false
1188 }
1189
1190 pub fn plan(&self) -> &Plan {
1191 &self.plan
1192 }
1193
1194 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1195 let new_entries_len = request.entries.len();
1196 let mut new_entries = request.entries.into_iter();
1197
1198 // Reuse existing markdown to prevent flickering
1199 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1200 let PlanEntry {
1201 content,
1202 priority,
1203 status,
1204 } = old;
1205 content.update(cx, |old, cx| {
1206 old.replace(new.content, cx);
1207 });
1208 *priority = new.priority;
1209 *status = new.status;
1210 }
1211 for new in new_entries {
1212 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1213 }
1214 self.plan.entries.truncate(new_entries_len);
1215
1216 cx.notify();
1217 }
1218
1219 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1220 self.plan
1221 .entries
1222 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1223 cx.notify();
1224 }
1225
1226 #[cfg(any(test, feature = "test-support"))]
1227 pub fn send_raw(
1228 &mut self,
1229 message: &str,
1230 cx: &mut Context<Self>,
1231 ) -> BoxFuture<'static, Result<()>> {
1232 self.send(
1233 vec![acp::ContentBlock::Text(acp::TextContent {
1234 text: message.to_string(),
1235 annotations: None,
1236 })],
1237 cx,
1238 )
1239 }
1240
1241 pub fn send(
1242 &mut self,
1243 message: Vec<acp::ContentBlock>,
1244 cx: &mut Context<Self>,
1245 ) -> BoxFuture<'static, Result<()>> {
1246 let block = ContentBlock::new_combined(
1247 message.clone(),
1248 self.project.read(cx).languages().clone(),
1249 cx,
1250 );
1251 let request = acp::PromptRequest {
1252 prompt: message.clone(),
1253 session_id: self.session_id.clone(),
1254 };
1255 let git_store = self.project.read(cx).git_store().clone();
1256
1257 let message_id = if self
1258 .connection
1259 .session_editor(&self.session_id, cx)
1260 .is_some()
1261 {
1262 Some(UserMessageId::new())
1263 } else {
1264 None
1265 };
1266
1267 self.run_turn(cx, async move |this, cx| {
1268 this.update(cx, |this, cx| {
1269 this.push_entry(
1270 AgentThreadEntry::UserMessage(UserMessage {
1271 id: message_id.clone(),
1272 content: block,
1273 chunks: message,
1274 checkpoint: None,
1275 }),
1276 cx,
1277 );
1278 })
1279 .ok();
1280
1281 let old_checkpoint = git_store
1282 .update(cx, |git, cx| git.checkpoint(cx))?
1283 .await
1284 .context("failed to get old checkpoint")
1285 .log_err();
1286 this.update(cx, |this, cx| {
1287 if let Some((_ix, message)) = this.last_user_message() {
1288 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1289 git_checkpoint,
1290 show: false,
1291 });
1292 }
1293 this.connection.prompt(message_id, request, cx)
1294 })?
1295 .await
1296 })
1297 }
1298
1299 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1300 self.run_turn(cx, async move |this, cx| {
1301 this.update(cx, |this, cx| {
1302 this.connection
1303 .resume(&this.session_id, cx)
1304 .map(|resume| resume.run(cx))
1305 })?
1306 .context("resuming a session is not supported")?
1307 .await
1308 })
1309 }
1310
1311 fn run_turn(
1312 &mut self,
1313 cx: &mut Context<Self>,
1314 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1315 ) -> BoxFuture<'static, Result<()>> {
1316 self.clear_completed_plan_entries(cx);
1317
1318 let (tx, rx) = oneshot::channel();
1319 let cancel_task = self.cancel(cx);
1320
1321 self.send_task = Some(cx.spawn(async move |this, cx| {
1322 cancel_task.await;
1323 tx.send(f(this, cx).await).ok();
1324 }));
1325
1326 cx.spawn(async move |this, cx| {
1327 let response = rx.await;
1328
1329 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1330 .await?;
1331
1332 this.update(cx, |this, cx| {
1333 this.project
1334 .update(cx, |project, cx| project.set_agent_location(None, cx));
1335 match response {
1336 Ok(Err(e)) => {
1337 this.send_task.take();
1338 cx.emit(AcpThreadEvent::Error);
1339 Err(e)
1340 }
1341 result => {
1342 let canceled = matches!(
1343 result,
1344 Ok(Ok(acp::PromptResponse {
1345 stop_reason: acp::StopReason::Canceled
1346 }))
1347 );
1348
1349 // We only take the task if the current prompt wasn't canceled.
1350 //
1351 // This prompt may have been canceled because another one was sent
1352 // while it was still generating. In these cases, dropping `send_task`
1353 // would cause the next generation to be canceled.
1354 if !canceled {
1355 this.send_task.take();
1356 }
1357
1358 cx.emit(AcpThreadEvent::Stopped);
1359 Ok(())
1360 }
1361 }
1362 })?
1363 })
1364 .boxed()
1365 }
1366
1367 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1368 let Some(send_task) = self.send_task.take() else {
1369 return Task::ready(());
1370 };
1371
1372 for entry in self.entries.iter_mut() {
1373 if let AgentThreadEntry::ToolCall(call) = entry {
1374 let cancel = matches!(
1375 call.status,
1376 ToolCallStatus::Pending
1377 | ToolCallStatus::WaitingForConfirmation { .. }
1378 | ToolCallStatus::InProgress
1379 );
1380
1381 if cancel {
1382 call.status = ToolCallStatus::Canceled;
1383 }
1384 }
1385 }
1386
1387 self.connection.cancel(&self.session_id, cx);
1388
1389 // Wait for the send task to complete
1390 cx.foreground_executor().spawn(send_task)
1391 }
1392
1393 /// Rewinds this thread to before the entry at `index`, removing it and all
1394 /// subsequent entries while reverting any changes made from that point.
1395 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1396 let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1397 return Task::ready(Err(anyhow!("not supported")));
1398 };
1399 let Some(message) = self.user_message(&id) else {
1400 return Task::ready(Err(anyhow!("message not found")));
1401 };
1402
1403 let checkpoint = message
1404 .checkpoint
1405 .as_ref()
1406 .map(|c| c.git_checkpoint.clone());
1407
1408 let git_store = self.project.read(cx).git_store().clone();
1409 cx.spawn(async move |this, cx| {
1410 if let Some(checkpoint) = checkpoint {
1411 git_store
1412 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1413 .await?;
1414 }
1415
1416 cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1417 .await?;
1418 this.update(cx, |this, cx| {
1419 if let Some((ix, _)) = this.user_message_mut(&id) {
1420 let range = ix..this.entries.len();
1421 this.entries.truncate(ix);
1422 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1423 }
1424 })
1425 })
1426 }
1427
1428 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1429 let git_store = self.project.read(cx).git_store().clone();
1430
1431 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1432 if let Some(checkpoint) = message.checkpoint.as_ref() {
1433 checkpoint.git_checkpoint.clone()
1434 } else {
1435 return Task::ready(Ok(()));
1436 }
1437 } else {
1438 return Task::ready(Ok(()));
1439 };
1440
1441 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1442 cx.spawn(async move |this, cx| {
1443 let new_checkpoint = new_checkpoint
1444 .await
1445 .context("failed to get new checkpoint")
1446 .log_err();
1447 if let Some(new_checkpoint) = new_checkpoint {
1448 let equal = git_store
1449 .update(cx, |git, cx| {
1450 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1451 })?
1452 .await
1453 .unwrap_or(true);
1454 this.update(cx, |this, cx| {
1455 let (ix, message) = this.last_user_message().context("no user message")?;
1456 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1457 checkpoint.show = !equal;
1458 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1459 anyhow::Ok(())
1460 })??;
1461 }
1462
1463 Ok(())
1464 })
1465 }
1466
1467 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1468 self.entries
1469 .iter_mut()
1470 .enumerate()
1471 .rev()
1472 .find_map(|(ix, entry)| {
1473 if let AgentThreadEntry::UserMessage(message) = entry {
1474 Some((ix, message))
1475 } else {
1476 None
1477 }
1478 })
1479 }
1480
1481 fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1482 self.entries.iter().find_map(|entry| {
1483 if let AgentThreadEntry::UserMessage(message) = entry {
1484 if message.id.as_ref() == Some(id) {
1485 Some(message)
1486 } else {
1487 None
1488 }
1489 } else {
1490 None
1491 }
1492 })
1493 }
1494
1495 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1496 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1497 if let AgentThreadEntry::UserMessage(message) = entry {
1498 if message.id.as_ref() == Some(id) {
1499 Some((ix, message))
1500 } else {
1501 None
1502 }
1503 } else {
1504 None
1505 }
1506 })
1507 }
1508
1509 pub fn read_text_file(
1510 &self,
1511 path: PathBuf,
1512 line: Option<u32>,
1513 limit: Option<u32>,
1514 reuse_shared_snapshot: bool,
1515 cx: &mut Context<Self>,
1516 ) -> Task<Result<String>> {
1517 let project = self.project.clone();
1518 let action_log = self.action_log.clone();
1519 cx.spawn(async move |this, cx| {
1520 let load = project.update(cx, |project, cx| {
1521 let path = project
1522 .project_path_for_absolute_path(&path, cx)
1523 .context("invalid path")?;
1524 anyhow::Ok(project.open_buffer(path, cx))
1525 });
1526 let buffer = load??.await?;
1527
1528 let snapshot = if reuse_shared_snapshot {
1529 this.read_with(cx, |this, _| {
1530 this.shared_buffers.get(&buffer.clone()).cloned()
1531 })
1532 .log_err()
1533 .flatten()
1534 } else {
1535 None
1536 };
1537
1538 let snapshot = if let Some(snapshot) = snapshot {
1539 snapshot
1540 } else {
1541 action_log.update(cx, |action_log, cx| {
1542 action_log.buffer_read(buffer.clone(), cx);
1543 })?;
1544 project.update(cx, |project, cx| {
1545 let position = buffer
1546 .read(cx)
1547 .snapshot()
1548 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1549 project.set_agent_location(
1550 Some(AgentLocation {
1551 buffer: buffer.downgrade(),
1552 position,
1553 }),
1554 cx,
1555 );
1556 })?;
1557
1558 buffer.update(cx, |buffer, _| buffer.snapshot())?
1559 };
1560
1561 this.update(cx, |this, _| {
1562 let text = snapshot.text();
1563 this.shared_buffers.insert(buffer.clone(), snapshot);
1564 if line.is_none() && limit.is_none() {
1565 return Ok(text);
1566 }
1567 let limit = limit.unwrap_or(u32::MAX) as usize;
1568 let Some(line) = line else {
1569 return Ok(text.lines().take(limit).collect::<String>());
1570 };
1571
1572 let count = text.lines().count();
1573 if count < line as usize {
1574 anyhow::bail!("There are only {} lines", count);
1575 }
1576 Ok(text
1577 .lines()
1578 .skip(line as usize + 1)
1579 .take(limit)
1580 .collect::<String>())
1581 })?
1582 })
1583 }
1584
1585 pub fn write_text_file(
1586 &self,
1587 path: PathBuf,
1588 content: String,
1589 cx: &mut Context<Self>,
1590 ) -> Task<Result<()>> {
1591 let project = self.project.clone();
1592 let action_log = self.action_log.clone();
1593 cx.spawn(async move |this, cx| {
1594 let load = project.update(cx, |project, cx| {
1595 let path = project
1596 .project_path_for_absolute_path(&path, cx)
1597 .context("invalid path")?;
1598 anyhow::Ok(project.open_buffer(path, cx))
1599 });
1600 let buffer = load??.await?;
1601 let snapshot = this.update(cx, |this, cx| {
1602 this.shared_buffers
1603 .get(&buffer)
1604 .cloned()
1605 .unwrap_or_else(|| buffer.read(cx).snapshot())
1606 })?;
1607 let edits = cx
1608 .background_executor()
1609 .spawn(async move {
1610 let old_text = snapshot.text();
1611 text_diff(old_text.as_str(), &content)
1612 .into_iter()
1613 .map(|(range, replacement)| {
1614 (
1615 snapshot.anchor_after(range.start)
1616 ..snapshot.anchor_before(range.end),
1617 replacement,
1618 )
1619 })
1620 .collect::<Vec<_>>()
1621 })
1622 .await;
1623
1624 project.update(cx, |project, cx| {
1625 project.set_agent_location(
1626 Some(AgentLocation {
1627 buffer: buffer.downgrade(),
1628 position: edits
1629 .last()
1630 .map(|(range, _)| range.end)
1631 .unwrap_or(Anchor::MIN),
1632 }),
1633 cx,
1634 );
1635 })?;
1636
1637 let format_on_save = cx.update(|cx| {
1638 action_log.update(cx, |action_log, cx| {
1639 action_log.buffer_read(buffer.clone(), cx);
1640 });
1641
1642 let format_on_save = buffer.update(cx, |buffer, cx| {
1643 buffer.edit(edits, None, cx);
1644
1645 let settings = language::language_settings::language_settings(
1646 buffer.language().map(|l| l.name()),
1647 buffer.file(),
1648 cx,
1649 );
1650
1651 settings.format_on_save != FormatOnSave::Off
1652 });
1653 action_log.update(cx, |action_log, cx| {
1654 action_log.buffer_edited(buffer.clone(), cx);
1655 });
1656 format_on_save
1657 })?;
1658
1659 if format_on_save {
1660 let format_task = project.update(cx, |project, cx| {
1661 project.format(
1662 HashSet::from_iter([buffer.clone()]),
1663 LspFormatTarget::Buffers,
1664 false,
1665 FormatTrigger::Save,
1666 cx,
1667 )
1668 })?;
1669 format_task.await.log_err();
1670
1671 action_log.update(cx, |action_log, cx| {
1672 action_log.buffer_edited(buffer.clone(), cx);
1673 })?;
1674 }
1675
1676 project
1677 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1678 .await
1679 })
1680 }
1681
1682 pub fn to_markdown(&self, cx: &App) -> String {
1683 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1684 }
1685
1686 pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1687 cx.emit(AcpThreadEvent::ServerExited(status));
1688 }
1689}
1690
1691fn markdown_for_raw_output(
1692 raw_output: &serde_json::Value,
1693 language_registry: &Arc<LanguageRegistry>,
1694 cx: &mut App,
1695) -> Option<Entity<Markdown>> {
1696 match raw_output {
1697 serde_json::Value::Null => None,
1698 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1699 Markdown::new(
1700 value.to_string().into(),
1701 Some(language_registry.clone()),
1702 None,
1703 cx,
1704 )
1705 })),
1706 serde_json::Value::Number(value) => Some(cx.new(|cx| {
1707 Markdown::new(
1708 value.to_string().into(),
1709 Some(language_registry.clone()),
1710 None,
1711 cx,
1712 )
1713 })),
1714 serde_json::Value::String(value) => Some(cx.new(|cx| {
1715 Markdown::new(
1716 value.clone().into(),
1717 Some(language_registry.clone()),
1718 None,
1719 cx,
1720 )
1721 })),
1722 value => Some(cx.new(|cx| {
1723 Markdown::new(
1724 format!("```json\n{}\n```", value).into(),
1725 Some(language_registry.clone()),
1726 None,
1727 cx,
1728 )
1729 })),
1730 }
1731}
1732
1733#[cfg(test)]
1734mod tests {
1735 use super::*;
1736 use anyhow::anyhow;
1737 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1738 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1739 use indoc::indoc;
1740 use project::{FakeFs, Fs};
1741 use rand::Rng as _;
1742 use serde_json::json;
1743 use settings::SettingsStore;
1744 use smol::stream::StreamExt as _;
1745 use std::{
1746 any::Any,
1747 cell::RefCell,
1748 path::Path,
1749 rc::Rc,
1750 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1751 time::Duration,
1752 };
1753 use util::path;
1754
1755 fn init_test(cx: &mut TestAppContext) {
1756 env_logger::try_init().ok();
1757 cx.update(|cx| {
1758 let settings_store = SettingsStore::test(cx);
1759 cx.set_global(settings_store);
1760 Project::init_settings(cx);
1761 language::init(cx);
1762 });
1763 }
1764
1765 #[gpui::test]
1766 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1767 init_test(cx);
1768
1769 let fs = FakeFs::new(cx.executor());
1770 let project = Project::test(fs, [], cx).await;
1771 let connection = Rc::new(FakeAgentConnection::new());
1772 let thread = cx
1773 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1774 .await
1775 .unwrap();
1776
1777 // Test creating a new user message
1778 thread.update(cx, |thread, cx| {
1779 thread.push_user_content_block(
1780 None,
1781 acp::ContentBlock::Text(acp::TextContent {
1782 annotations: None,
1783 text: "Hello, ".to_string(),
1784 }),
1785 cx,
1786 );
1787 });
1788
1789 thread.update(cx, |thread, cx| {
1790 assert_eq!(thread.entries.len(), 1);
1791 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1792 assert_eq!(user_msg.id, None);
1793 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1794 } else {
1795 panic!("Expected UserMessage");
1796 }
1797 });
1798
1799 // Test appending to existing user message
1800 let message_1_id = UserMessageId::new();
1801 thread.update(cx, |thread, cx| {
1802 thread.push_user_content_block(
1803 Some(message_1_id.clone()),
1804 acp::ContentBlock::Text(acp::TextContent {
1805 annotations: None,
1806 text: "world!".to_string(),
1807 }),
1808 cx,
1809 );
1810 });
1811
1812 thread.update(cx, |thread, cx| {
1813 assert_eq!(thread.entries.len(), 1);
1814 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1815 assert_eq!(user_msg.id, Some(message_1_id));
1816 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1817 } else {
1818 panic!("Expected UserMessage");
1819 }
1820 });
1821
1822 // Test creating new user message after assistant message
1823 thread.update(cx, |thread, cx| {
1824 thread.push_assistant_content_block(
1825 acp::ContentBlock::Text(acp::TextContent {
1826 annotations: None,
1827 text: "Assistant response".to_string(),
1828 }),
1829 false,
1830 cx,
1831 );
1832 });
1833
1834 let message_2_id = UserMessageId::new();
1835 thread.update(cx, |thread, cx| {
1836 thread.push_user_content_block(
1837 Some(message_2_id.clone()),
1838 acp::ContentBlock::Text(acp::TextContent {
1839 annotations: None,
1840 text: "New user message".to_string(),
1841 }),
1842 cx,
1843 );
1844 });
1845
1846 thread.update(cx, |thread, cx| {
1847 assert_eq!(thread.entries.len(), 3);
1848 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1849 assert_eq!(user_msg.id, Some(message_2_id));
1850 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1851 } else {
1852 panic!("Expected UserMessage at index 2");
1853 }
1854 });
1855 }
1856
1857 #[gpui::test]
1858 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1859 init_test(cx);
1860
1861 let fs = FakeFs::new(cx.executor());
1862 let project = Project::test(fs, [], cx).await;
1863 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1864 |_, thread, mut cx| {
1865 async move {
1866 thread.update(&mut cx, |thread, cx| {
1867 thread
1868 .handle_session_update(
1869 acp::SessionUpdate::AgentThoughtChunk {
1870 content: "Thinking ".into(),
1871 },
1872 cx,
1873 )
1874 .unwrap();
1875 thread
1876 .handle_session_update(
1877 acp::SessionUpdate::AgentThoughtChunk {
1878 content: "hard!".into(),
1879 },
1880 cx,
1881 )
1882 .unwrap();
1883 })?;
1884 Ok(acp::PromptResponse {
1885 stop_reason: acp::StopReason::EndTurn,
1886 })
1887 }
1888 .boxed_local()
1889 },
1890 ));
1891
1892 let thread = cx
1893 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1894 .await
1895 .unwrap();
1896
1897 thread
1898 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1899 .await
1900 .unwrap();
1901
1902 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1903 assert_eq!(
1904 output,
1905 indoc! {r#"
1906 ## User
1907
1908 Hello from Zed!
1909
1910 ## Assistant
1911
1912 <thinking>
1913 Thinking hard!
1914 </thinking>
1915
1916 "#}
1917 );
1918 }
1919
1920 #[gpui::test]
1921 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1922 init_test(cx);
1923
1924 let fs = FakeFs::new(cx.executor());
1925 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1926 .await;
1927 let project = Project::test(fs.clone(), [], cx).await;
1928 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1929 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1930 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1931 move |_, thread, mut cx| {
1932 let read_file_tx = read_file_tx.clone();
1933 async move {
1934 let content = thread
1935 .update(&mut cx, |thread, cx| {
1936 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1937 })
1938 .unwrap()
1939 .await
1940 .unwrap();
1941 assert_eq!(content, "one\ntwo\nthree\n");
1942 read_file_tx.take().unwrap().send(()).unwrap();
1943 thread
1944 .update(&mut cx, |thread, cx| {
1945 thread.write_text_file(
1946 path!("/tmp/foo").into(),
1947 "one\ntwo\nthree\nfour\nfive\n".to_string(),
1948 cx,
1949 )
1950 })
1951 .unwrap()
1952 .await
1953 .unwrap();
1954 Ok(acp::PromptResponse {
1955 stop_reason: acp::StopReason::EndTurn,
1956 })
1957 }
1958 .boxed_local()
1959 },
1960 ));
1961
1962 let (worktree, pathbuf) = project
1963 .update(cx, |project, cx| {
1964 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1965 })
1966 .await
1967 .unwrap();
1968 let buffer = project
1969 .update(cx, |project, cx| {
1970 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1971 })
1972 .await
1973 .unwrap();
1974
1975 let thread = cx
1976 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1977 .await
1978 .unwrap();
1979
1980 let request = thread.update(cx, |thread, cx| {
1981 thread.send_raw("Extend the count in /tmp/foo", cx)
1982 });
1983 read_file_rx.await.ok();
1984 buffer.update(cx, |buffer, cx| {
1985 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1986 });
1987 cx.run_until_parked();
1988 assert_eq!(
1989 buffer.read_with(cx, |buffer, _| buffer.text()),
1990 "zero\none\ntwo\nthree\nfour\nfive\n"
1991 );
1992 assert_eq!(
1993 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1994 "zero\none\ntwo\nthree\nfour\nfive\n"
1995 );
1996 request.await.unwrap();
1997 }
1998
1999 #[gpui::test]
2000 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2001 init_test(cx);
2002
2003 let fs = FakeFs::new(cx.executor());
2004 let project = Project::test(fs, [], cx).await;
2005 let id = acp::ToolCallId("test".into());
2006
2007 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2008 let id = id.clone();
2009 move |_, thread, mut cx| {
2010 let id = id.clone();
2011 async move {
2012 thread
2013 .update(&mut cx, |thread, cx| {
2014 thread.handle_session_update(
2015 acp::SessionUpdate::ToolCall(acp::ToolCall {
2016 id: id.clone(),
2017 title: "Label".into(),
2018 kind: acp::ToolKind::Fetch,
2019 status: acp::ToolCallStatus::InProgress,
2020 content: vec![],
2021 locations: vec![],
2022 raw_input: None,
2023 raw_output: None,
2024 }),
2025 cx,
2026 )
2027 })
2028 .unwrap()
2029 .unwrap();
2030 Ok(acp::PromptResponse {
2031 stop_reason: acp::StopReason::EndTurn,
2032 })
2033 }
2034 .boxed_local()
2035 }
2036 }));
2037
2038 let thread = cx
2039 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2040 .await
2041 .unwrap();
2042
2043 let request = thread.update(cx, |thread, cx| {
2044 thread.send_raw("Fetch https://example.com", cx)
2045 });
2046
2047 run_until_first_tool_call(&thread, cx).await;
2048
2049 thread.read_with(cx, |thread, _| {
2050 assert!(matches!(
2051 thread.entries[1],
2052 AgentThreadEntry::ToolCall(ToolCall {
2053 status: ToolCallStatus::InProgress,
2054 ..
2055 })
2056 ));
2057 });
2058
2059 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2060
2061 thread.read_with(cx, |thread, _| {
2062 assert!(matches!(
2063 &thread.entries[1],
2064 AgentThreadEntry::ToolCall(ToolCall {
2065 status: ToolCallStatus::Canceled,
2066 ..
2067 })
2068 ));
2069 });
2070
2071 thread
2072 .update(cx, |thread, cx| {
2073 thread.handle_session_update(
2074 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2075 id,
2076 fields: acp::ToolCallUpdateFields {
2077 status: Some(acp::ToolCallStatus::Completed),
2078 ..Default::default()
2079 },
2080 }),
2081 cx,
2082 )
2083 })
2084 .unwrap();
2085
2086 request.await.unwrap();
2087
2088 thread.read_with(cx, |thread, _| {
2089 assert!(matches!(
2090 thread.entries[1],
2091 AgentThreadEntry::ToolCall(ToolCall {
2092 status: ToolCallStatus::Completed,
2093 ..
2094 })
2095 ));
2096 });
2097 }
2098
2099 #[gpui::test]
2100 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2101 init_test(cx);
2102 let fs = FakeFs::new(cx.background_executor.clone());
2103 fs.insert_tree(path!("/test"), json!({})).await;
2104 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2105
2106 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2107 move |_, thread, mut cx| {
2108 async move {
2109 thread
2110 .update(&mut cx, |thread, cx| {
2111 thread.handle_session_update(
2112 acp::SessionUpdate::ToolCall(acp::ToolCall {
2113 id: acp::ToolCallId("test".into()),
2114 title: "Label".into(),
2115 kind: acp::ToolKind::Edit,
2116 status: acp::ToolCallStatus::Completed,
2117 content: vec![acp::ToolCallContent::Diff {
2118 diff: acp::Diff {
2119 path: "/test/test.txt".into(),
2120 old_text: None,
2121 new_text: "foo".into(),
2122 },
2123 }],
2124 locations: vec![],
2125 raw_input: None,
2126 raw_output: None,
2127 }),
2128 cx,
2129 )
2130 })
2131 .unwrap()
2132 .unwrap();
2133 Ok(acp::PromptResponse {
2134 stop_reason: acp::StopReason::EndTurn,
2135 })
2136 }
2137 .boxed_local()
2138 }
2139 }));
2140
2141 let thread = cx
2142 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2143 .await
2144 .unwrap();
2145
2146 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2147 .await
2148 .unwrap();
2149
2150 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2151 }
2152
2153 #[gpui::test(iterations = 10)]
2154 async fn test_checkpoints(cx: &mut TestAppContext) {
2155 init_test(cx);
2156 let fs = FakeFs::new(cx.background_executor.clone());
2157 fs.insert_tree(
2158 path!("/test"),
2159 json!({
2160 ".git": {}
2161 }),
2162 )
2163 .await;
2164 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2165
2166 let simulate_changes = Arc::new(AtomicBool::new(true));
2167 let next_filename = Arc::new(AtomicUsize::new(0));
2168 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2169 let simulate_changes = simulate_changes.clone();
2170 let next_filename = next_filename.clone();
2171 let fs = fs.clone();
2172 move |request, thread, mut cx| {
2173 let fs = fs.clone();
2174 let simulate_changes = simulate_changes.clone();
2175 let next_filename = next_filename.clone();
2176 async move {
2177 if simulate_changes.load(SeqCst) {
2178 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2179 fs.write(Path::new(&filename), b"").await?;
2180 }
2181
2182 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2183 panic!("expected text content block");
2184 };
2185 thread.update(&mut cx, |thread, cx| {
2186 thread
2187 .handle_session_update(
2188 acp::SessionUpdate::AgentMessageChunk {
2189 content: content.text.to_uppercase().into(),
2190 },
2191 cx,
2192 )
2193 .unwrap();
2194 })?;
2195 Ok(acp::PromptResponse {
2196 stop_reason: acp::StopReason::EndTurn,
2197 })
2198 }
2199 .boxed_local()
2200 }
2201 }));
2202 let thread = cx
2203 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2204 .await
2205 .unwrap();
2206
2207 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2208 .await
2209 .unwrap();
2210 thread.read_with(cx, |thread, cx| {
2211 assert_eq!(
2212 thread.to_markdown(cx),
2213 indoc! {"
2214 ## User (checkpoint)
2215
2216 Lorem
2217
2218 ## Assistant
2219
2220 LOREM
2221
2222 "}
2223 );
2224 });
2225 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2226
2227 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2228 .await
2229 .unwrap();
2230 thread.read_with(cx, |thread, cx| {
2231 assert_eq!(
2232 thread.to_markdown(cx),
2233 indoc! {"
2234 ## User (checkpoint)
2235
2236 Lorem
2237
2238 ## Assistant
2239
2240 LOREM
2241
2242 ## User (checkpoint)
2243
2244 ipsum
2245
2246 ## Assistant
2247
2248 IPSUM
2249
2250 "}
2251 );
2252 });
2253 assert_eq!(
2254 fs.files(),
2255 vec![
2256 Path::new(path!("/test/file-0")),
2257 Path::new(path!("/test/file-1"))
2258 ]
2259 );
2260
2261 // Checkpoint isn't stored when there are no changes.
2262 simulate_changes.store(false, SeqCst);
2263 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2264 .await
2265 .unwrap();
2266 thread.read_with(cx, |thread, cx| {
2267 assert_eq!(
2268 thread.to_markdown(cx),
2269 indoc! {"
2270 ## User (checkpoint)
2271
2272 Lorem
2273
2274 ## Assistant
2275
2276 LOREM
2277
2278 ## User (checkpoint)
2279
2280 ipsum
2281
2282 ## Assistant
2283
2284 IPSUM
2285
2286 ## User
2287
2288 dolor
2289
2290 ## Assistant
2291
2292 DOLOR
2293
2294 "}
2295 );
2296 });
2297 assert_eq!(
2298 fs.files(),
2299 vec![
2300 Path::new(path!("/test/file-0")),
2301 Path::new(path!("/test/file-1"))
2302 ]
2303 );
2304
2305 // Rewinding the conversation truncates the history and restores the checkpoint.
2306 thread
2307 .update(cx, |thread, cx| {
2308 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2309 panic!("unexpected entries {:?}", thread.entries)
2310 };
2311 thread.rewind(message.id.clone().unwrap(), cx)
2312 })
2313 .await
2314 .unwrap();
2315 thread.read_with(cx, |thread, cx| {
2316 assert_eq!(
2317 thread.to_markdown(cx),
2318 indoc! {"
2319 ## User (checkpoint)
2320
2321 Lorem
2322
2323 ## Assistant
2324
2325 LOREM
2326
2327 "}
2328 );
2329 });
2330 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
2331 }
2332
2333 async fn run_until_first_tool_call(
2334 thread: &Entity<AcpThread>,
2335 cx: &mut TestAppContext,
2336 ) -> usize {
2337 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2338
2339 let subscription = cx.update(|cx| {
2340 cx.subscribe(thread, move |thread, _, cx| {
2341 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2342 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2343 return tx.try_send(ix).unwrap();
2344 }
2345 }
2346 })
2347 });
2348
2349 select! {
2350 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2351 panic!("Timeout waiting for tool call")
2352 }
2353 ix = rx.next().fuse() => {
2354 drop(subscription);
2355 ix.unwrap()
2356 }
2357 }
2358 }
2359
2360 #[derive(Clone, Default)]
2361 struct FakeAgentConnection {
2362 auth_methods: Vec<acp::AuthMethod>,
2363 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2364 on_user_message: Option<
2365 Rc<
2366 dyn Fn(
2367 acp::PromptRequest,
2368 WeakEntity<AcpThread>,
2369 AsyncApp,
2370 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2371 + 'static,
2372 >,
2373 >,
2374 }
2375
2376 impl FakeAgentConnection {
2377 fn new() -> Self {
2378 Self {
2379 auth_methods: Vec::new(),
2380 on_user_message: None,
2381 sessions: Arc::default(),
2382 }
2383 }
2384
2385 #[expect(unused)]
2386 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2387 self.auth_methods = auth_methods;
2388 self
2389 }
2390
2391 fn on_user_message(
2392 mut self,
2393 handler: impl Fn(
2394 acp::PromptRequest,
2395 WeakEntity<AcpThread>,
2396 AsyncApp,
2397 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2398 + 'static,
2399 ) -> Self {
2400 self.on_user_message.replace(Rc::new(handler));
2401 self
2402 }
2403 }
2404
2405 impl AgentConnection for FakeAgentConnection {
2406 fn auth_methods(&self) -> &[acp::AuthMethod] {
2407 &self.auth_methods
2408 }
2409
2410 fn new_thread(
2411 self: Rc<Self>,
2412 project: Entity<Project>,
2413 _cwd: &Path,
2414 cx: &mut App,
2415 ) -> Task<gpui::Result<Entity<AcpThread>>> {
2416 let session_id = acp::SessionId(
2417 rand::thread_rng()
2418 .sample_iter(&rand::distributions::Alphanumeric)
2419 .take(7)
2420 .map(char::from)
2421 .collect::<String>()
2422 .into(),
2423 );
2424 let action_log = cx.new(|_| ActionLog::new(project.clone()));
2425 let thread = cx.new(|_cx| {
2426 AcpThread::new(
2427 "Test",
2428 self.clone(),
2429 project,
2430 action_log,
2431 session_id.clone(),
2432 )
2433 });
2434 self.sessions.lock().insert(session_id, thread.downgrade());
2435 Task::ready(Ok(thread))
2436 }
2437
2438 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2439 if self.auth_methods().iter().any(|m| m.id == method) {
2440 Task::ready(Ok(()))
2441 } else {
2442 Task::ready(Err(anyhow!("Invalid Auth Method")))
2443 }
2444 }
2445
2446 fn prompt(
2447 &self,
2448 _id: Option<UserMessageId>,
2449 params: acp::PromptRequest,
2450 cx: &mut App,
2451 ) -> Task<gpui::Result<acp::PromptResponse>> {
2452 let sessions = self.sessions.lock();
2453 let thread = sessions.get(¶ms.session_id).unwrap();
2454 if let Some(handler) = &self.on_user_message {
2455 let handler = handler.clone();
2456 let thread = thread.clone();
2457 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2458 } else {
2459 Task::ready(Ok(acp::PromptResponse {
2460 stop_reason: acp::StopReason::EndTurn,
2461 }))
2462 }
2463 }
2464
2465 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2466 let sessions = self.sessions.lock();
2467 let thread = sessions.get(session_id).unwrap().clone();
2468
2469 cx.spawn(async move |cx| {
2470 thread
2471 .update(cx, |thread, cx| thread.cancel(cx))
2472 .unwrap()
2473 .await
2474 })
2475 .detach();
2476 }
2477
2478 fn session_editor(
2479 &self,
2480 session_id: &acp::SessionId,
2481 _cx: &mut App,
2482 ) -> Option<Rc<dyn AgentSessionEditor>> {
2483 Some(Rc::new(FakeAgentSessionEditor {
2484 _session_id: session_id.clone(),
2485 }))
2486 }
2487
2488 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2489 self
2490 }
2491 }
2492
2493 struct FakeAgentSessionEditor {
2494 _session_id: acp::SessionId,
2495 }
2496
2497 impl AgentSessionEditor for FakeAgentSessionEditor {
2498 fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2499 Task::ready(Ok(()))
2500 }
2501 }
2502}