1mod connection;
2pub use connection::*;
3
4use agent_client_protocol as acp;
5use agentic_coding_protocol::{self as acp_old, ToolCallConfirmationOutcome};
6use anyhow::{Context as _, Result};
7use assistant_tool::ActionLog;
8use buffer_diff::BufferDiff;
9use editor::{Bias, MultiBuffer, PathKey};
10use futures::{FutureExt, channel::oneshot, future::BoxFuture};
11use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
12use itertools::{Itertools, PeekingNext};
13use language::{
14 Anchor, Buffer, BufferSnapshot, Capability, LanguageRegistry, OffsetRangeExt as _, Point,
15 text_diff,
16};
17use markdown::Markdown;
18use project::{AgentLocation, Project};
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::error::Error;
22use std::fmt::Formatter;
23use std::rc::Rc;
24use std::{
25 fmt::Display,
26 mem,
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use ui::App;
31use util::ResultExt;
32
33#[derive(Debug)]
34pub struct UserMessage {
35 pub content: ContentBlock,
36}
37
38impl UserMessage {
39 pub fn from_acp(
40 message: impl IntoIterator<Item = acp::ContentBlock>,
41 language_registry: Arc<LanguageRegistry>,
42 cx: &mut App,
43 ) -> Self {
44 let mut content = ContentBlock::Empty;
45 for chunk in message {
46 content.append(chunk, &language_registry, cx)
47 }
48 Self { content: content }
49 }
50
51 fn to_markdown(&self, cx: &App) -> String {
52 format!("## User\n{}\n", self.content.to_markdown(cx))
53 }
54}
55
56#[derive(Debug)]
57pub struct MentionPath<'a>(&'a Path);
58
59impl<'a> MentionPath<'a> {
60 const PREFIX: &'static str = "@file:";
61
62 pub fn new(path: &'a Path) -> Self {
63 MentionPath(path)
64 }
65
66 pub fn try_parse(url: &'a str) -> Option<Self> {
67 let path = url.strip_prefix(Self::PREFIX)?;
68 Some(MentionPath(Path::new(path)))
69 }
70
71 pub fn path(&self) -> &Path {
72 self.0
73 }
74}
75
76impl Display for MentionPath<'_> {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "[@{}]({}{})",
81 self.0.file_name().unwrap_or_default().display(),
82 Self::PREFIX,
83 self.0.display()
84 )
85 }
86}
87
88#[derive(Debug, PartialEq)]
89pub struct AssistantMessage {
90 pub chunks: Vec<AssistantMessageChunk>,
91}
92
93impl AssistantMessage {
94 pub fn to_markdown(&self, cx: &App) -> String {
95 format!(
96 "## Assistant\n\n{}\n\n",
97 self.chunks
98 .iter()
99 .map(|chunk| chunk.to_markdown(cx))
100 .join("\n\n")
101 )
102 }
103}
104
105#[derive(Debug, PartialEq)]
106pub enum AssistantMessageChunk {
107 Message { block: ContentBlock },
108 Thought { block: ContentBlock },
109}
110
111impl AssistantMessageChunk {
112 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
113 Self::Message {
114 block: ContentBlock::new(
115 acp::ContentBlock::Text(acp::TextContent {
116 text: chunk.to_owned().into(),
117 annotations: None,
118 }),
119 language_registry,
120 cx,
121 ),
122 }
123 }
124
125 fn to_markdown(&self, cx: &App) -> String {
126 match self {
127 Self::Message { block } => block.to_markdown(cx).to_string(),
128 Self::Thought { block } => {
129 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
130 }
131 }
132 }
133}
134
135#[derive(Debug)]
136pub enum AgentThreadEntry {
137 UserMessage(UserMessage),
138 AssistantMessage(AssistantMessage),
139 ToolCall(ToolCall),
140}
141
142impl AgentThreadEntry {
143 fn to_markdown(&self, cx: &App) -> String {
144 match self {
145 Self::UserMessage(message) => message.to_markdown(cx),
146 Self::AssistantMessage(message) => message.to_markdown(cx),
147 Self::ToolCall(too_call) => too_call.to_markdown(cx),
148 }
149 }
150
151 // todo! return all diffs?
152 pub fn first_diff(&self) -> Option<&Diff> {
153 if let AgentThreadEntry::ToolCall(call) = self {
154 call.first_diff()
155 } else {
156 None
157 }
158 }
159
160 pub fn locations(&self) -> Option<&[acp::ToolCallLocation]> {
161 if let AgentThreadEntry::ToolCall(ToolCall { locations, .. }) = self {
162 Some(locations)
163 } else {
164 None
165 }
166 }
167}
168
169#[derive(Debug)]
170pub struct ToolCall {
171 pub id: acp::ToolCallId,
172 pub label: Entity<Markdown>,
173 pub kind: acp::ToolKind,
174 pub content: Vec<ToolCallContent>,
175 pub status: ToolCallStatus,
176 pub locations: Vec<acp::ToolCallLocation>,
177}
178
179impl ToolCall {
180 fn from_acp(
181 tool_call: acp::ToolCall,
182 status: ToolCallStatus,
183 language_registry: Arc<LanguageRegistry>,
184 cx: &mut App,
185 ) -> Self {
186 Self {
187 id: tool_call.id,
188 label: cx.new(|cx| {
189 Markdown::new(
190 tool_call.label.into(),
191 Some(language_registry.clone()),
192 None,
193 cx,
194 )
195 }),
196 kind: tool_call.kind,
197 content: tool_call
198 .content
199 .into_iter()
200 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
201 .collect(),
202 locations: tool_call.locations,
203 status,
204 }
205 }
206
207 pub fn first_diff(&self) -> Option<&Diff> {
208 self.content.iter().find_map(|content| match content {
209 ToolCallContent::ContentBlock { .. } => None,
210 ToolCallContent::Diff { diff } => Some(diff),
211 })
212 }
213
214 fn to_markdown(&self, cx: &App) -> String {
215 let mut markdown = format!(
216 "**Tool Call: {}**\nStatus: {}\n\n",
217 self.label.read(cx).source(),
218 self.status
219 );
220 for content in &self.content {
221 markdown.push_str(content.to_markdown(cx).as_str());
222 markdown.push_str("\n\n");
223 }
224 markdown
225 }
226}
227
228#[derive(Debug)]
229pub enum ToolCallStatus {
230 WaitingForConfirmation {
231 options: Vec<acp::PermissionOption>,
232 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
233 },
234 Allowed {
235 status: acp::ToolCallStatus,
236 },
237 Rejected,
238 Canceled,
239}
240
241impl Display for ToolCallStatus {
242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243 write!(
244 f,
245 "{}",
246 match self {
247 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
248 ToolCallStatus::Allowed { status } => match status {
249 acp::ToolCallStatus::InProgress => "In Progress",
250 acp::ToolCallStatus::Completed => "Completed",
251 acp::ToolCallStatus::Failed => "Failed",
252 },
253 ToolCallStatus::Rejected => "Rejected",
254 ToolCallStatus::Canceled => "Canceled",
255 }
256 )
257 }
258}
259
260#[derive(Debug, PartialEq, Clone)]
261enum ContentBlock {
262 Empty,
263 Markdown { markdown: Entity<Markdown> },
264}
265
266impl ContentBlock {
267 pub fn new(
268 block: acp::ContentBlock,
269 language_registry: &Arc<LanguageRegistry>,
270 cx: &mut App,
271 ) -> Self {
272 let mut this = Self::Empty;
273 this.append(block, language_registry, cx);
274 this
275 }
276
277 pub fn new_combined(
278 blocks: impl IntoIterator<Item = acp::ContentBlock>,
279 language_registry: Arc<LanguageRegistry>,
280 cx: &mut App,
281 ) -> Self {
282 let mut this = Self::Empty;
283 for block in blocks {
284 this.append(block, &language_registry, cx);
285 }
286 this
287 }
288
289 pub fn append(
290 &mut self,
291 block: acp::ContentBlock,
292 language_registry: &Arc<LanguageRegistry>,
293 cx: &mut App,
294 ) {
295 let new_content = match block {
296 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
297 acp::ContentBlock::ResourceLink(resource_link) => {
298 if let Some(path) = resource_link.uri.strip_prefix("file://") {
299 format!("{}", MentionPath(path.as_ref()))
300 } else {
301 resource_link.uri.clone()
302 }
303 }
304 acp::ContentBlock::Image(_)
305 | acp::ContentBlock::Audio(_)
306 | acp::ContentBlock::Resource(_) => String::new(),
307 };
308
309 match self {
310 ContentBlock::Empty => {
311 *self = ContentBlock::Markdown {
312 markdown: cx.new(|cx| {
313 Markdown::new(
314 new_content.into(),
315 Some(language_registry.clone()),
316 None,
317 cx,
318 )
319 }),
320 };
321 }
322 ContentBlock::Markdown { markdown } => {
323 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
324 }
325 }
326 }
327
328 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
329 match self {
330 ContentBlock::Empty => "",
331 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
332 }
333 }
334}
335
336#[derive(Debug)]
337pub enum ToolCallContent {
338 ContentBlock { content: ContentBlock },
339 Diff { diff: Diff },
340}
341
342impl ToolCallContent {
343 pub fn from_acp(
344 content: acp::ToolCallContent,
345 language_registry: Arc<LanguageRegistry>,
346 cx: &mut App,
347 ) -> Self {
348 match content {
349 acp::ToolCallContent::ContentBlock { content } => Self::ContentBlock {
350 content: ContentBlock::new(content, &language_registry, cx),
351 },
352 acp::ToolCallContent::Diff { diff } => Self::Diff {
353 diff: Diff::from_acp(diff, language_registry, cx),
354 },
355 }
356 }
357
358 pub fn from_acp_contents(
359 content: Vec<acp::ToolCallContent>,
360 language_registry: Arc<LanguageRegistry>,
361 cx: &mut App,
362 ) -> Vec<Self> {
363 content
364 .into_iter()
365 .peekable()
366 .batching(|it| match it.next()? {
367 acp::ToolCallContent::ContentBlock { content } => {
368 let mut block = ContentBlock::new(content, &language_registry, cx);
369 while let Some(acp::ToolCallContent::ContentBlock { content }) =
370 it.peeking_next(|c| matches!(c, acp::ToolCallContent::ContentBlock { .. }))
371 {
372 block.append(content, &language_registry, cx);
373 }
374 Some(ToolCallContent::ContentBlock { content: block })
375 }
376 content @ acp::ToolCallContent::Diff { .. } => Some(ToolCallContent::from_acp(
377 content,
378 language_registry.clone(),
379 cx,
380 )),
381 })
382 .collect()
383 }
384
385 pub fn to_markdown(&self, cx: &App) -> String {
386 match self {
387 Self::ContentBlock { content } => content.to_markdown(cx).to_string(),
388 Self::Diff { diff } => diff.to_markdown(cx),
389 }
390 }
391}
392
393#[derive(Debug)]
394pub struct Diff {
395 pub multibuffer: Entity<MultiBuffer>,
396 pub path: PathBuf,
397 pub new_buffer: Entity<Buffer>,
398 pub old_buffer: Entity<Buffer>,
399 _task: Task<Result<()>>,
400}
401
402impl Diff {
403 pub fn from_acp(
404 diff: acp::Diff,
405 language_registry: Arc<LanguageRegistry>,
406 cx: &mut App,
407 ) -> Self {
408 let acp::Diff {
409 path,
410 old_text,
411 new_text,
412 } = diff;
413
414 let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
415
416 let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
417 let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
418 let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
419 let old_buffer_snapshot = old_buffer.read(cx).snapshot();
420 let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
421 let diff_task = buffer_diff.update(cx, |diff, cx| {
422 diff.set_base_text(
423 old_buffer_snapshot,
424 Some(language_registry.clone()),
425 new_buffer_snapshot,
426 cx,
427 )
428 });
429
430 let task = cx.spawn({
431 let multibuffer = multibuffer.clone();
432 let path = path.clone();
433 let new_buffer = new_buffer.clone();
434 async move |cx| {
435 diff_task.await?;
436
437 multibuffer
438 .update(cx, |multibuffer, cx| {
439 let hunk_ranges = {
440 let buffer = new_buffer.read(cx);
441 let diff = buffer_diff.read(cx);
442 diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
443 .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
444 .collect::<Vec<_>>()
445 };
446
447 multibuffer.set_excerpts_for_path(
448 PathKey::for_buffer(&new_buffer, cx),
449 new_buffer.clone(),
450 hunk_ranges,
451 editor::DEFAULT_MULTIBUFFER_CONTEXT,
452 cx,
453 );
454 multibuffer.add_diff(buffer_diff.clone(), cx);
455 })
456 .log_err();
457
458 if let Some(language) = language_registry
459 .language_for_file_path(&path)
460 .await
461 .log_err()
462 {
463 new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
464 }
465
466 anyhow::Ok(())
467 }
468 });
469
470 Self {
471 multibuffer,
472 path,
473 new_buffer,
474 old_buffer,
475 _task: task,
476 }
477 }
478
479 fn to_markdown(&self, cx: &App) -> String {
480 let buffer_text = self
481 .multibuffer
482 .read(cx)
483 .all_buffers()
484 .iter()
485 .map(|buffer| buffer.read(cx).text())
486 .join("\n");
487 format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
488 }
489}
490
491#[derive(Debug, Default)]
492pub struct Plan {
493 pub entries: Vec<PlanEntry>,
494}
495
496#[derive(Debug)]
497pub struct PlanStats<'a> {
498 pub in_progress_entry: Option<&'a PlanEntry>,
499 pub pending: u32,
500 pub completed: u32,
501}
502
503impl Plan {
504 pub fn is_empty(&self) -> bool {
505 self.entries.is_empty()
506 }
507
508 pub fn stats(&self) -> PlanStats<'_> {
509 let mut stats = PlanStats {
510 in_progress_entry: None,
511 pending: 0,
512 completed: 0,
513 };
514
515 for entry in &self.entries {
516 match &entry.status {
517 acp::PlanEntryStatus::Pending => {
518 stats.pending += 1;
519 }
520 acp::PlanEntryStatus::InProgress => {
521 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
522 }
523 acp::PlanEntryStatus::Completed => {
524 stats.completed += 1;
525 }
526 }
527 }
528
529 stats
530 }
531}
532
533#[derive(Debug)]
534pub struct PlanEntry {
535 pub content: Entity<Markdown>,
536 pub priority: acp::PlanEntryPriority,
537 pub status: acp::PlanEntryStatus,
538}
539
540impl PlanEntry {
541 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
542 Self {
543 content: cx.new(|cx| Markdown::new_text(entry.content.into(), cx)),
544 priority: entry.priority,
545 status: entry.status,
546 }
547 }
548}
549
550pub struct AcpThread {
551 title: SharedString,
552 entries: Vec<AgentThreadEntry>,
553 plan: Plan,
554 project: Entity<Project>,
555 action_log: Entity<ActionLog>,
556 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
557 send_task: Option<Task<()>>,
558 connection: Arc<dyn AgentConnection>,
559 child_status: Option<Task<Result<()>>>,
560 session_id: acp::SessionId,
561}
562
563pub enum AcpThreadEvent {
564 NewEntry,
565 EntryUpdated(usize),
566}
567
568impl EventEmitter<AcpThreadEvent> for AcpThread {}
569
570#[derive(PartialEq, Eq)]
571pub enum ThreadStatus {
572 Idle,
573 WaitingForToolConfirmation,
574 Generating,
575}
576
577#[derive(Debug, Clone)]
578pub enum LoadError {
579 Unsupported {
580 error_message: SharedString,
581 upgrade_message: SharedString,
582 upgrade_command: String,
583 },
584 Exited(i32),
585 Other(SharedString),
586}
587
588impl Display for LoadError {
589 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
590 match self {
591 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
592 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
593 LoadError::Other(msg) => write!(f, "{}", msg),
594 }
595 }
596}
597
598impl Error for LoadError {}
599
600impl AcpThread {
601 pub fn new(
602 connection: impl AgentConnection + 'static,
603 title: SharedString,
604 child_status: Option<Task<Result<()>>>,
605 project: Entity<Project>,
606 session_id: acp::SessionId,
607 cx: &mut Context<Self>,
608 ) -> Self {
609 let action_log = cx.new(|_| ActionLog::new(project.clone()));
610
611 Self {
612 action_log,
613 shared_buffers: Default::default(),
614 entries: Default::default(),
615 plan: Default::default(),
616 title,
617 project,
618 send_task: None,
619 connection: Arc::new(connection),
620 child_status,
621 session_id,
622 }
623 }
624
625 pub fn action_log(&self) -> &Entity<ActionLog> {
626 &self.action_log
627 }
628
629 pub fn project(&self) -> &Entity<Project> {
630 &self.project
631 }
632
633 pub fn title(&self) -> SharedString {
634 self.title.clone()
635 }
636
637 pub fn entries(&self) -> &[AgentThreadEntry] {
638 &self.entries
639 }
640
641 pub fn status(&self) -> ThreadStatus {
642 if self.send_task.is_some() {
643 if self.waiting_for_tool_confirmation() {
644 ThreadStatus::WaitingForToolConfirmation
645 } else {
646 ThreadStatus::Generating
647 }
648 } else {
649 ThreadStatus::Idle
650 }
651 }
652
653 pub fn has_pending_edit_tool_calls(&self) -> bool {
654 for entry in self.entries.iter().rev() {
655 match entry {
656 AgentThreadEntry::UserMessage(_) => return false,
657 AgentThreadEntry::ToolCall(call) if call.first_diff().is_some() => return true,
658 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
659 }
660 }
661
662 false
663 }
664
665 pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
666 self.entries.push(entry);
667 cx.emit(AcpThreadEvent::NewEntry);
668 }
669
670 pub fn push_assistant_chunk(
671 &mut self,
672 chunk: acp::ContentBlock,
673 is_thought: bool,
674 cx: &mut Context<Self>,
675 ) {
676 let language_registry = self.project.read(cx).languages().clone();
677 let entries_len = self.entries.len();
678 if let Some(last_entry) = self.entries.last_mut()
679 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
680 {
681 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
682 match (chunks.last_mut(), is_thought) {
683 (Some(AssistantMessageChunk::Message { block }), false)
684 | (Some(AssistantMessageChunk::Thought { block }), true) => {
685 block.append(chunk, &language_registry, cx)
686 }
687 _ => {
688 let block = ContentBlock::new(chunk, &language_registry, cx);
689 if is_thought {
690 chunks.push(AssistantMessageChunk::Thought { block })
691 } else {
692 chunks.push(AssistantMessageChunk::Message { block })
693 }
694 }
695 }
696 } else {
697 let block = ContentBlock::new(chunk, &language_registry, cx);
698 let chunk = if is_thought {
699 AssistantMessageChunk::Thought { block }
700 } else {
701 AssistantMessageChunk::Message { block }
702 };
703
704 self.push_entry(
705 AgentThreadEntry::AssistantMessage(AssistantMessage {
706 chunks: vec![chunk],
707 }),
708 cx,
709 );
710 }
711 }
712
713 pub fn update_tool_call(
714 &mut self,
715 tool_call: acp::ToolCall,
716 cx: &mut Context<Self>,
717 ) -> Result<()> {
718 let language_registry = self.project.read(cx).languages().clone();
719 let status = ToolCallStatus::Allowed {
720 status: tool_call.status,
721 };
722 let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
723
724 let location = call.locations.last().cloned();
725
726 if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
727 match ¤t_call.status {
728 ToolCallStatus::WaitingForConfirmation { .. } => {
729 anyhow::bail!("Tool call hasn't been authorized yet")
730 }
731 ToolCallStatus::Rejected => {
732 anyhow::bail!("Tool call was rejected and therefore can't be updated")
733 }
734 ToolCallStatus::Allowed { .. } | ToolCallStatus::Canceled => {}
735 }
736
737 *current_call = call;
738
739 cx.emit(AcpThreadEvent::EntryUpdated(ix));
740 } else {
741 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
742 }
743
744 if let Some(location) = location {
745 self.set_project_location(location, cx)
746 }
747
748 Ok(())
749 }
750
751 fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
752 // todo! use map
753 self.entries
754 .iter()
755 .enumerate()
756 .rev()
757 .find_map(|(index, tool_call)| {
758 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
759 && &tool_call.id == id
760 {
761 Some((index, tool_call))
762 } else {
763 None
764 }
765 })
766 }
767
768 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
769 // todo! use map
770 self.entries
771 .iter_mut()
772 .enumerate()
773 .rev()
774 .find_map(|(index, tool_call)| {
775 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
776 && &tool_call.id == id
777 {
778 Some((index, tool_call))
779 } else {
780 None
781 }
782 })
783 }
784
785 pub fn request_tool_call_permission(
786 &mut self,
787 tool_call: acp::ToolCall,
788 options: Vec<acp::PermissionOption>,
789 cx: &mut Context<Self>,
790 ) -> oneshot::Receiver<acp::PermissionOptionId> {
791 let (tx, rx) = oneshot::channel();
792
793 let status = ToolCallStatus::WaitingForConfirmation {
794 options,
795 respond_tx: tx,
796 };
797
798 self.insert_tool_call(tool_call, status, cx);
799 rx
800 }
801
802 fn insert_tool_call(
803 &mut self,
804 tool_call: acp::ToolCall,
805 status: ToolCallStatus,
806 cx: &mut Context<Self>,
807 ) {
808 let language_registry = self.project.read(cx).languages().clone();
809 let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
810
811 let location = call.locations.last().cloned();
812 if let Some(location) = location {
813 self.set_project_location(location, cx)
814 }
815
816 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
817 }
818
819 pub fn authorize_tool_call(
820 &mut self,
821 id: acp::ToolCallId,
822 option: acp::PermissionOption,
823 cx: &mut Context<Self>,
824 ) {
825 let Some((ix, call)) = self.tool_call_mut(&id) else {
826 return;
827 };
828
829 let new_status = match option.kind {
830 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
831 ToolCallStatus::Rejected
832 }
833 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
834 ToolCallStatus::Allowed {
835 status: acp::ToolCallStatus::InProgress,
836 }
837 }
838 };
839
840 let curr_status = mem::replace(&mut call.status, new_status);
841
842 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
843 respond_tx.send(option.id).log_err();
844 } else if cfg!(debug_assertions) {
845 panic!("tried to authorize an already authorized tool call");
846 }
847
848 cx.emit(AcpThreadEvent::EntryUpdated(ix));
849 }
850
851 pub fn plan(&self) -> &Plan {
852 &self.plan
853 }
854
855 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
856 self.plan = Plan {
857 entries: request
858 .entries
859 .into_iter()
860 .map(|entry| PlanEntry::from_acp(entry, cx))
861 .collect(),
862 };
863
864 cx.notify();
865 }
866
867 pub fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
868 self.plan
869 .entries
870 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
871 cx.notify();
872 }
873
874 pub fn set_project_location(&self, location: acp::ToolCallLocation, cx: &mut Context<Self>) {
875 self.project.update(cx, |project, cx| {
876 let Some(path) = project.project_path_for_absolute_path(&location.path, cx) else {
877 return;
878 };
879 let buffer = project.open_buffer(path, cx);
880 cx.spawn(async move |project, cx| {
881 let buffer = buffer.await?;
882
883 project.update(cx, |project, cx| {
884 let position = if let Some(line) = location.line {
885 let snapshot = buffer.read(cx).snapshot();
886 let point = snapshot.clip_point(Point::new(line, 0), Bias::Left);
887 snapshot.anchor_before(point)
888 } else {
889 Anchor::MIN
890 };
891
892 project.set_agent_location(
893 Some(AgentLocation {
894 buffer: buffer.downgrade(),
895 position,
896 }),
897 cx,
898 );
899 })
900 })
901 .detach_and_log_err(cx);
902 });
903 }
904
905 /// Returns true if the last turn is awaiting tool authorization
906 pub fn waiting_for_tool_confirmation(&self) -> bool {
907 for entry in self.entries.iter().rev() {
908 match &entry {
909 AgentThreadEntry::ToolCall(call) => match call.status {
910 ToolCallStatus::WaitingForConfirmation { .. } => return true,
911 ToolCallStatus::Allowed { .. }
912 | ToolCallStatus::Rejected
913 | ToolCallStatus::Canceled => continue,
914 },
915 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
916 // Reached the beginning of the turn
917 return false;
918 }
919 }
920 }
921 false
922 }
923
924 pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
925 self.connection.authenticate()
926 }
927
928 #[cfg(any(test, feature = "test-support"))]
929 pub fn send_raw(
930 &mut self,
931 message: &str,
932 cx: &mut Context<Self>,
933 ) -> BoxFuture<'static, Result<(), acp_old::Error>> {
934 self.send(
935 vec![acp::ContentBlock::Text(acp::TextContent {
936 text: message.to_string(),
937 annotations: None,
938 })],
939 cx,
940 )
941 }
942
943 pub fn send(
944 &mut self,
945 message: Vec<acp::ContentBlock>,
946 cx: &mut Context<Self>,
947 ) -> BoxFuture<'static, Result<(), acp_old::Error>> {
948 let block = ContentBlock::new_combined(
949 message.clone(),
950 self.project.read(cx).languages().clone(),
951 cx,
952 );
953 self.push_entry(
954 AgentThreadEntry::UserMessage(UserMessage { content: block }),
955 cx,
956 );
957
958 let (tx, rx) = oneshot::channel();
959 let cancel = self.cancel(cx);
960
961 self.send_task = Some(cx.spawn(async move |this, cx| {
962 async {
963 cancel.await.log_err();
964
965 let result = this
966 .update(cx, |this, _| {
967 this.connection.prompt(acp::PromptToolArguments {
968 prompt: message,
969 session_id: this.session_id.clone(),
970 })
971 })?
972 .await;
973 tx.send(result).log_err();
974 this.update(cx, |this, _cx| this.send_task.take())?;
975 anyhow::Ok(())
976 }
977 .await
978 .log_err();
979 }));
980
981 async move {
982 match rx.await {
983 Ok(Err(e)) => Err(e)?,
984 _ => Ok(()),
985 }
986 }
987 .boxed()
988 }
989
990 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<(), acp_old::Error>> {
991 if self.send_task.take().is_some() {
992 let request = self.connection.cancel();
993 cx.spawn(async move |this, cx| {
994 request.await?;
995 this.update(cx, |this, _cx| {
996 for entry in this.entries.iter_mut() {
997 if let AgentThreadEntry::ToolCall(call) = entry {
998 let cancel = matches!(
999 call.status,
1000 ToolCallStatus::WaitingForConfirmation { .. }
1001 | ToolCallStatus::Allowed {
1002 status: acp::ToolCallStatus::InProgress
1003 }
1004 );
1005
1006 if cancel {
1007 call.status = ToolCallStatus::Canceled;
1008 }
1009 }
1010 }
1011 })?;
1012 Ok(())
1013 })
1014 } else {
1015 Task::ready(Ok(()))
1016 }
1017 }
1018
1019 pub fn read_text_file(
1020 &self,
1021 request: acp::ReadTextFileArguments,
1022 reuse_shared_snapshot: bool,
1023 cx: &mut Context<Self>,
1024 ) -> Task<Result<String>> {
1025 let project = self.project.clone();
1026 let action_log = self.action_log.clone();
1027 cx.spawn(async move |this, cx| {
1028 let load = project.update(cx, |project, cx| {
1029 let path = project
1030 .project_path_for_absolute_path(&request.path, cx)
1031 .context("invalid path")?;
1032 anyhow::Ok(project.open_buffer(path, cx))
1033 });
1034 let buffer = load??.await?;
1035
1036 let snapshot = if reuse_shared_snapshot {
1037 this.read_with(cx, |this, _| {
1038 this.shared_buffers.get(&buffer.clone()).cloned()
1039 })
1040 .log_err()
1041 .flatten()
1042 } else {
1043 None
1044 };
1045
1046 let snapshot = if let Some(snapshot) = snapshot {
1047 snapshot
1048 } else {
1049 action_log.update(cx, |action_log, cx| {
1050 action_log.buffer_read(buffer.clone(), cx);
1051 })?;
1052 project.update(cx, |project, cx| {
1053 let position = buffer
1054 .read(cx)
1055 .snapshot()
1056 .anchor_before(Point::new(request.line.unwrap_or_default(), 0));
1057 project.set_agent_location(
1058 Some(AgentLocation {
1059 buffer: buffer.downgrade(),
1060 position,
1061 }),
1062 cx,
1063 );
1064 })?;
1065
1066 buffer.update(cx, |buffer, _| buffer.snapshot())?
1067 };
1068
1069 this.update(cx, |this, _| {
1070 let text = snapshot.text();
1071 this.shared_buffers.insert(buffer.clone(), snapshot);
1072 if request.line.is_none() && request.limit.is_none() {
1073 return Ok(text);
1074 }
1075 let limit = request.limit.unwrap_or(u32::MAX) as usize;
1076 let Some(line) = request.line else {
1077 return Ok(text.lines().take(limit).collect::<String>());
1078 };
1079
1080 let count = text.lines().count();
1081 if count < line as usize {
1082 anyhow::bail!("There are only {} lines", count);
1083 }
1084 Ok(text
1085 .lines()
1086 .skip(line as usize + 1)
1087 .take(limit)
1088 .collect::<String>())
1089 })?
1090 })
1091 }
1092
1093 pub fn write_text_file(
1094 &self,
1095 request: acp::WriteTextFileToolArguments,
1096 cx: &mut Context<Self>,
1097 ) -> Task<Result<()>> {
1098 let project = self.project.clone();
1099 let action_log = self.action_log.clone();
1100 cx.spawn(async move |this, cx| {
1101 let load = project.update(cx, |project, cx| {
1102 let path = project
1103 .project_path_for_absolute_path(&request.path, cx)
1104 .context("invalid path")?;
1105 anyhow::Ok(project.open_buffer(path, cx))
1106 });
1107 let buffer = load??.await?;
1108 let snapshot = this.update(cx, |this, cx| {
1109 this.shared_buffers
1110 .get(&buffer)
1111 .cloned()
1112 .unwrap_or_else(|| buffer.read(cx).snapshot())
1113 })?;
1114 let edits = cx
1115 .background_executor()
1116 .spawn(async move {
1117 let old_text = snapshot.text();
1118 text_diff(old_text.as_str(), &request.content)
1119 .into_iter()
1120 .map(|(range, replacement)| {
1121 (
1122 snapshot.anchor_after(range.start)
1123 ..snapshot.anchor_before(range.end),
1124 replacement,
1125 )
1126 })
1127 .collect::<Vec<_>>()
1128 })
1129 .await;
1130 cx.update(|cx| {
1131 project.update(cx, |project, cx| {
1132 project.set_agent_location(
1133 Some(AgentLocation {
1134 buffer: buffer.downgrade(),
1135 position: edits
1136 .last()
1137 .map(|(range, _)| range.end)
1138 .unwrap_or(Anchor::MIN),
1139 }),
1140 cx,
1141 );
1142 });
1143
1144 action_log.update(cx, |action_log, cx| {
1145 action_log.buffer_read(buffer.clone(), cx);
1146 });
1147 buffer.update(cx, |buffer, cx| {
1148 buffer.edit(edits, None, cx);
1149 });
1150 action_log.update(cx, |action_log, cx| {
1151 action_log.buffer_edited(buffer.clone(), cx);
1152 });
1153 })?;
1154 project
1155 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1156 .await
1157 })
1158 }
1159
1160 pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
1161 self.child_status.take()
1162 }
1163
1164 pub fn to_markdown(&self, cx: &App) -> String {
1165 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1166 }
1167}
1168
1169#[derive(Clone)]
1170pub struct OldAcpClientDelegate {
1171 thread: WeakEntity<AcpThread>,
1172 cx: AsyncApp,
1173 next_tool_call_id: Rc<RefCell<u64>>,
1174 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
1175}
1176
1177impl OldAcpClientDelegate {
1178 pub fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
1179 Self {
1180 thread,
1181 cx,
1182 next_tool_call_id: Rc::new(RefCell::new(0)),
1183 }
1184 }
1185
1186 pub async fn clear_completed_plan_entries(&self) -> Result<()> {
1187 let cx = &mut self.cx.clone();
1188 cx.update(|cx| {
1189 self.thread
1190 .update(cx, |thread, cx| thread.clear_completed_plan_entries(cx))
1191 })?
1192 .context("Failed to update thread")?;
1193
1194 Ok(())
1195 }
1196
1197 pub async fn request_existing_tool_call_confirmation(
1198 &self,
1199 tool_call_id: acp_old::ToolCallId,
1200 confirmation: acp_old::ToolCallConfirmation,
1201 ) -> Result<acp_old::ToolCallConfirmationOutcome> {
1202 let cx = &mut self.cx.clone();
1203
1204 let tool_call = into_new_tool_call(
1205 acp::ToolCallId(tool_call_id.0.to_string().into()),
1206 confirmation,
1207 );
1208
1209 let options = [
1210 acp_old::ToolCallConfirmationOutcome::Allow,
1211 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1212 acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer,
1213 acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool,
1214 acp_old::ToolCallConfirmationOutcome::Reject,
1215 ]
1216 .into_iter()
1217 .map(|outcome| match outcome {
1218 acp_old::ToolCallConfirmationOutcome::Allow => acp::PermissionOption {
1219 id: acp::PermissionOptionId(Arc::from("allow")),
1220 label: "Allow".to_string(),
1221 kind: acp::PermissionOptionKind::AllowOnce,
1222 },
1223 acp_old::ToolCallConfirmationOutcome::AlwaysAllow => acp::PermissionOption {
1224 id: acp::PermissionOptionId(Arc::from("always-allow")),
1225 label: "Always Allow".to_string(),
1226 kind: acp::PermissionOptionKind::AllowOnce,
1227 },
1228 acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer => acp::PermissionOption {
1229 id: acp::PermissionOptionId(Arc::from("always-allow-mcp-server")),
1230 label: "Always Allow MCP Server".to_string(),
1231 kind: acp::PermissionOptionKind::AllowOnce,
1232 },
1233 acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool => acp::PermissionOption {
1234 id: acp::PermissionOptionId(Arc::from("always-allow-tool")),
1235 label: "Always Allow Tool".to_string(),
1236 kind: acp::PermissionOptionKind::AllowOnce,
1237 },
1238 acp_old::ToolCallConfirmationOutcome::Reject => acp::PermissionOption {
1239 id: acp::PermissionOptionId(Arc::from("reject")),
1240 label: "Reject".to_string(),
1241 kind: acp::PermissionOptionKind::AllowOnce,
1242 },
1243 acp_old::ToolCallConfirmationOutcome::Cancel => unreachable!(),
1244 })
1245 .collect();
1246
1247 let outcome = cx
1248 .update(|cx| {
1249 self.thread.update(cx, |thread, cx| {
1250 thread.request_tool_call_permission(tool_call, options, cx)
1251 })
1252 })?
1253 .context("Failed to update thread")?;
1254
1255 let response = match outcome.await {
1256 Ok(result) => match result.0.as_ref() {
1257 "allow" => acp_old::ToolCallConfirmationOutcome::Allow,
1258 "always-allow" => acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1259 "always-allow-tool" => acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool,
1260 "always-allow-mcp-server" => {
1261 acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer
1262 }
1263 "reject" => acp_old::ToolCallConfirmationOutcome::Reject,
1264 },
1265 Err(oneshot::Canceled) => acp_old::ToolCallConfirmationOutcome::Cancel,
1266 };
1267
1268 Ok(response)
1269 }
1270
1271 pub async fn read_text_file_reusing_snapshot(
1272 &self,
1273 request: acp_old::ReadTextFileParams,
1274 ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1275 let content = self
1276 .cx
1277 .update(|cx| {
1278 self.thread.update(cx, |thread, cx| {
1279 thread.read_text_file(
1280 acp::ReadTextFileArguments {
1281 path: request.path,
1282 line: request.line,
1283 limit: request.limit,
1284 },
1285 true,
1286 cx,
1287 )
1288 })
1289 })?
1290 .context("Failed to update thread")?
1291 .await?;
1292 Ok(acp_old::ReadTextFileResponse { content })
1293 }
1294}
1295impl acp_old::Client for OldAcpClientDelegate {
1296 async fn stream_assistant_message_chunk(
1297 &self,
1298 params: acp_old::StreamAssistantMessageChunkParams,
1299 ) -> Result<(), acp_old::Error> {
1300 let cx = &mut self.cx.clone();
1301
1302 cx.update(|cx| {
1303 self.thread
1304 .update(cx, |thread, cx| match params.chunk {
1305 acp_old::AssistantMessageChunk::Text { text } => {
1306 thread.push_assistant_chunk(text.into(), false, cx)
1307 }
1308 acp_old::AssistantMessageChunk::Thought { thought } => {
1309 thread.push_assistant_chunk(thought.into(), true, cx)
1310 }
1311 })
1312 .ok();
1313 })?;
1314
1315 Ok(())
1316 }
1317
1318 async fn request_tool_call_confirmation(
1319 &self,
1320 request: acp_old::RequestToolCallConfirmationParams,
1321 ) -> Result<acp_old::RequestToolCallConfirmationResponse, acp_old::Error> {
1322 let cx = &mut self.cx.clone();
1323 let ToolCallRequest { id, outcome } = cx
1324 .update(|cx| {
1325 self.thread
1326 .update(cx, |thread, cx| thread.request_new_tool_call(request, cx))
1327 })?
1328 .context("Failed to update thread")?;
1329
1330 Ok(acp_old::RequestToolCallConfirmationResponse {
1331 id,
1332 outcome: outcome.await.map_err(acp_old::Error::into_internal_error)?,
1333 })
1334 }
1335
1336 async fn push_tool_call(
1337 &self,
1338 request: acp_old::PushToolCallParams,
1339 ) -> Result<acp_old::PushToolCallResponse, acp_old::Error> {
1340 let cx = &mut self.cx.clone();
1341
1342 let old_acp_id = *self.next_tool_call_id.borrow() + 1;
1343 self.next_tool_call_id.replace(old_acp_id);
1344
1345 cx.update(|cx| {
1346 self.thread.update(cx, |thread, cx| {
1347 thread.update_tool_call(
1348 into_new_tool_call(acp::ToolCallId(old_acp_id.to_string().into()), request),
1349 cx,
1350 )
1351 })
1352 })?
1353 .context("Failed to update thread")?;
1354
1355 Ok(acp_old::PushToolCallResponse {
1356 id: acp_old::ToolCallId(old_acp_id),
1357 })
1358 }
1359
1360 async fn update_tool_call(
1361 &self,
1362 request: acp_old::UpdateToolCallParams,
1363 ) -> Result<(), acp_old::Error> {
1364 let cx = &mut self.cx.clone();
1365
1366 cx.update(|cx| {
1367 self.thread.update(cx, |thread, cx| {
1368 let languages = thread.project.read(cx).languages().clone();
1369
1370 if let Some((ix, tool_call)) = thread
1371 .tool_call_mut(&acp::ToolCallId(request.tool_call_id.0.to_string().into()))
1372 {
1373 tool_call.status = ToolCallStatus::Allowed {
1374 status: into_new_tool_call_status(request.status),
1375 };
1376 tool_call.content = request
1377 .content
1378 .into_iter()
1379 .map(|content| {
1380 ToolCallContent::from_acp(
1381 into_new_tool_call_content(content),
1382 languages.clone(),
1383 cx,
1384 )
1385 })
1386 .collect();
1387
1388 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1389 anyhow::Ok(())
1390 } else {
1391 anyhow::bail!("Tool call not found")
1392 }
1393 })
1394 })?
1395 .context("Failed to update thread")??;
1396
1397 Ok(())
1398 }
1399
1400 async fn update_plan(&self, request: acp_old::UpdatePlanParams) -> Result<(), acp_old::Error> {
1401 let cx = &mut self.cx.clone();
1402
1403 cx.update(|cx| {
1404 self.thread.update(cx, |thread, cx| {
1405 thread.update_plan(
1406 acp::Plan {
1407 entries: request
1408 .entries
1409 .into_iter()
1410 .map(into_new_plan_entry)
1411 .collect(),
1412 },
1413 cx,
1414 )
1415 })
1416 })?
1417 .context("Failed to update thread")?;
1418
1419 Ok(())
1420 }
1421
1422 async fn read_text_file(
1423 &self,
1424 request: acp_old::ReadTextFileParams,
1425 ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1426 let content = self
1427 .cx
1428 .update(|cx| {
1429 self.thread.update(cx, |thread, cx| {
1430 thread.read_text_file(
1431 acp::ReadTextFileArguments {
1432 path: request.path,
1433 line: request.line,
1434 limit: request.limit,
1435 },
1436 false,
1437 cx,
1438 )
1439 })
1440 })?
1441 .context("Failed to update thread")?
1442 .await?;
1443 Ok(acp_old::ReadTextFileResponse { content })
1444 }
1445
1446 async fn write_text_file(
1447 &self,
1448 request: acp_old::WriteTextFileParams,
1449 ) -> Result<(), acp_old::Error> {
1450 self.cx
1451 .update(|cx| {
1452 self.thread.update(cx, |thread, cx| {
1453 thread.write_text_file(
1454 acp::WriteTextFileToolArguments {
1455 path: request.path,
1456 content: request.content,
1457 },
1458 cx,
1459 )
1460 })
1461 })?
1462 .context("Failed to update thread")?
1463 .await?;
1464
1465 Ok(())
1466 }
1467}
1468
1469fn into_new_tool_call(id: acp::ToolCallId, request: acp_old::PushToolCallParams) -> acp::ToolCall {
1470 acp::ToolCall {
1471 id: id,
1472 label: request.label,
1473 kind: acp_kind_from_old_icon(request.icon),
1474 status: acp::ToolCallStatus::InProgress,
1475 content: request
1476 .content
1477 .into_iter()
1478 .map(into_new_tool_call_content)
1479 .collect(),
1480 locations: request
1481 .locations
1482 .into_iter()
1483 .map(into_new_tool_call_location)
1484 .collect(),
1485 }
1486}
1487
1488fn acp_kind_from_old_icon(icon: acp_old::Icon) -> acp::ToolKind {
1489 match icon {
1490 acp_old::Icon::FileSearch => acp::ToolKind::Search,
1491 acp_old::Icon::Folder => acp::ToolKind::Search,
1492 acp_old::Icon::Globe => acp::ToolKind::Search,
1493 acp_old::Icon::Hammer => acp::ToolKind::Other,
1494 acp_old::Icon::LightBulb => acp::ToolKind::Think,
1495 acp_old::Icon::Pencil => acp::ToolKind::Edit,
1496 acp_old::Icon::Regex => acp::ToolKind::Search,
1497 acp_old::Icon::Terminal => acp::ToolKind::Execute,
1498 }
1499}
1500
1501fn into_new_tool_call_status(status: acp_old::ToolCallStatus) -> acp::ToolCallStatus {
1502 match status {
1503 acp_old::ToolCallStatus::Running => acp::ToolCallStatus::InProgress,
1504 acp_old::ToolCallStatus::Finished => acp::ToolCallStatus::Completed,
1505 acp_old::ToolCallStatus::Error => acp::ToolCallStatus::Failed,
1506 }
1507}
1508
1509fn into_new_tool_call_content(content: acp_old::ToolCallContent) -> acp::ToolCallContent {
1510 match content {
1511 acp_old::ToolCallContent::Markdown { markdown } => acp::ToolCallContent::ContentBlock {
1512 content: acp::ContentBlock::Text(acp::TextContent {
1513 annotations: None,
1514 text: markdown,
1515 }),
1516 },
1517 acp_old::ToolCallContent::Diff { diff } => acp::ToolCallContent::Diff {
1518 diff: into_new_diff(diff),
1519 },
1520 }
1521}
1522
1523fn into_new_diff(diff: acp_old::Diff) -> acp::Diff {
1524 acp::Diff {
1525 path: diff.path,
1526 old_text: diff.old_text,
1527 new_text: diff.new_text,
1528 }
1529}
1530
1531fn into_new_tool_call_location(location: acp_old::ToolCallLocation) -> acp::ToolCallLocation {
1532 acp::ToolCallLocation {
1533 path: location.path,
1534 line: location.line,
1535 }
1536}
1537
1538fn into_new_plan(request: acp_old::UpdatePlanParams) -> acp::Plan {
1539 acp::Plan {
1540 entries: request
1541 .entries
1542 .into_iter()
1543 .map(into_new_plan_entry)
1544 .collect(),
1545 }
1546}
1547
1548fn into_new_plan_entry(entry: acp_old::PlanEntry) -> acp::PlanEntry {
1549 acp::PlanEntry {
1550 content: entry.content,
1551 priority: into_new_plan_priority(entry.priority),
1552 status: into_new_plan_status(entry.status),
1553 }
1554}
1555
1556fn into_new_plan_priority(priority: acp_old::PlanEntryPriority) -> acp::PlanEntryPriority {
1557 match priority {
1558 acp_old::PlanEntryPriority::Low => acp::PlanEntryPriority::Low,
1559 acp_old::PlanEntryPriority::Medium => acp::PlanEntryPriority::Medium,
1560 acp_old::PlanEntryPriority::High => acp::PlanEntryPriority::High,
1561 }
1562}
1563
1564fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatus {
1565 match status {
1566 acp_old::PlanEntryStatus::Pending => acp::PlanEntryStatus::Pending,
1567 acp_old::PlanEntryStatus::InProgress => acp::PlanEntryStatus::InProgress,
1568 acp_old::PlanEntryStatus::Completed => acp::PlanEntryStatus::Completed,
1569 }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574 use super::*;
1575 use anyhow::anyhow;
1576 use async_pipe::{PipeReader, PipeWriter};
1577 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1578 use gpui::{AsyncApp, TestAppContext};
1579 use indoc::indoc;
1580 use project::FakeFs;
1581 use serde_json::json;
1582 use settings::SettingsStore;
1583 use smol::{future::BoxedLocal, stream::StreamExt as _};
1584 use std::{cell::RefCell, rc::Rc, time::Duration};
1585 use util::path;
1586
1587 fn init_test(cx: &mut TestAppContext) {
1588 env_logger::try_init().ok();
1589 cx.update(|cx| {
1590 let settings_store = SettingsStore::test(cx);
1591 cx.set_global(settings_store);
1592 Project::init_settings(cx);
1593 language::init(cx);
1594 });
1595 }
1596
1597 #[gpui::test]
1598 async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1599 init_test(cx);
1600
1601 let fs = FakeFs::new(cx.executor());
1602 let project = Project::test(fs, [], cx).await;
1603 let (thread, fake_server) = fake_acp_thread(project, cx);
1604
1605 fake_server.update(cx, |fake_server, _| {
1606 fake_server.on_user_message(move |_, server, mut cx| async move {
1607 server
1608 .update(&mut cx, |server, _| {
1609 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1610 chunk: acp_old::AssistantMessageChunk::Thought {
1611 thought: "Thinking ".into(),
1612 },
1613 })
1614 })?
1615 .await
1616 .unwrap();
1617 server
1618 .update(&mut cx, |server, _| {
1619 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1620 chunk: acp_old::AssistantMessageChunk::Thought {
1621 thought: "hard!".into(),
1622 },
1623 })
1624 })?
1625 .await
1626 .unwrap();
1627
1628 Ok(())
1629 })
1630 });
1631
1632 thread
1633 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1634 .await
1635 .unwrap();
1636
1637 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1638 assert_eq!(
1639 output,
1640 indoc! {r#"
1641 ## User
1642
1643 Hello from Zed!
1644
1645 ## Assistant
1646
1647 <thinking>
1648 Thinking hard!
1649 </thinking>
1650
1651 "#}
1652 );
1653 }
1654
1655 #[gpui::test]
1656 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1657 init_test(cx);
1658
1659 let fs = FakeFs::new(cx.executor());
1660 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1661 .await;
1662 let project = Project::test(fs.clone(), [], cx).await;
1663 let (thread, fake_server) = fake_acp_thread(project.clone(), cx);
1664 let (worktree, pathbuf) = project
1665 .update(cx, |project, cx| {
1666 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1667 })
1668 .await
1669 .unwrap();
1670 let buffer = project
1671 .update(cx, |project, cx| {
1672 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1673 })
1674 .await
1675 .unwrap();
1676
1677 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1678 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1679
1680 fake_server.update(cx, |fake_server, _| {
1681 fake_server.on_user_message(move |_, server, mut cx| {
1682 let read_file_tx = read_file_tx.clone();
1683 async move {
1684 let content = server
1685 .update(&mut cx, |server, _| {
1686 server.send_to_zed(acp_old::ReadTextFileParams {
1687 path: path!("/tmp/foo").into(),
1688 line: None,
1689 limit: None,
1690 })
1691 })?
1692 .await
1693 .unwrap();
1694 assert_eq!(content.content, "one\ntwo\nthree\n");
1695 read_file_tx.take().unwrap().send(()).unwrap();
1696 server
1697 .update(&mut cx, |server, _| {
1698 server.send_to_zed(acp_old::WriteTextFileParams {
1699 path: path!("/tmp/foo").into(),
1700 content: "one\ntwo\nthree\nfour\nfive\n".to_string(),
1701 })
1702 })?
1703 .await
1704 .unwrap();
1705 Ok(())
1706 }
1707 })
1708 });
1709
1710 let request = thread.update(cx, |thread, cx| {
1711 thread.send_raw("Extend the count in /tmp/foo", cx)
1712 });
1713 read_file_rx.await.ok();
1714 buffer.update(cx, |buffer, cx| {
1715 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1716 });
1717 cx.run_until_parked();
1718 assert_eq!(
1719 buffer.read_with(cx, |buffer, _| buffer.text()),
1720 "zero\none\ntwo\nthree\nfour\nfive\n"
1721 );
1722 assert_eq!(
1723 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1724 "zero\none\ntwo\nthree\nfour\nfive\n"
1725 );
1726 request.await.unwrap();
1727 }
1728
1729 #[gpui::test]
1730 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1731 init_test(cx);
1732
1733 let fs = FakeFs::new(cx.executor());
1734 let project = Project::test(fs, [], cx).await;
1735 let (thread, fake_server) = fake_acp_thread(project, cx);
1736
1737 let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1738
1739 let tool_call_id = Rc::new(RefCell::new(None));
1740 let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1741 fake_server.update(cx, |fake_server, _| {
1742 let tool_call_id = tool_call_id.clone();
1743 fake_server.on_user_message(move |_, server, mut cx| {
1744 let end_turn_rx = end_turn_rx.clone();
1745 let tool_call_id = tool_call_id.clone();
1746 async move {
1747 let tool_call_result = server
1748 .update(&mut cx, |server, _| {
1749 server.send_to_zed(acp_old::PushToolCallParams {
1750 label: "Fetch".to_string(),
1751 icon: acp_old::Icon::Globe,
1752 content: None,
1753 locations: vec![],
1754 })
1755 })?
1756 .await
1757 .unwrap();
1758 *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1759 end_turn_rx.take().unwrap().await.ok();
1760
1761 Ok(())
1762 }
1763 })
1764 });
1765
1766 let request = thread.update(cx, |thread, cx| {
1767 thread.send_raw("Fetch https://example.com", cx)
1768 });
1769
1770 run_until_first_tool_call(&thread, cx).await;
1771
1772 thread.read_with(cx, |thread, _| {
1773 assert!(matches!(
1774 thread.entries[1],
1775 AgentThreadEntry::ToolCall(ToolCall {
1776 status: ToolCallStatus::Allowed {
1777 status: acp::ToolCallStatus::InProgress,
1778 ..
1779 },
1780 ..
1781 })
1782 ));
1783 });
1784
1785 cx.run_until_parked();
1786
1787 thread
1788 .update(cx, |thread, cx| thread.cancel(cx))
1789 .await
1790 .unwrap();
1791
1792 thread.read_with(cx, |thread, _| {
1793 assert!(matches!(
1794 &thread.entries[1],
1795 AgentThreadEntry::ToolCall(ToolCall {
1796 status: ToolCallStatus::Canceled,
1797 ..
1798 })
1799 ));
1800 });
1801
1802 fake_server
1803 .update(cx, |fake_server, _| {
1804 fake_server.send_to_zed(acp_old::UpdateToolCallParams {
1805 tool_call_id: tool_call_id.borrow().unwrap(),
1806 status: acp_old::ToolCallStatus::Finished,
1807 content: None,
1808 })
1809 })
1810 .await
1811 .unwrap();
1812
1813 drop(end_turn_tx);
1814 request.await.unwrap();
1815
1816 thread.read_with(cx, |thread, _| {
1817 assert!(matches!(
1818 thread.entries[1],
1819 AgentThreadEntry::ToolCall(ToolCall {
1820 status: ToolCallStatus::Allowed {
1821 status: acp::ToolCallStatus::Completed,
1822 ..
1823 },
1824 ..
1825 })
1826 ));
1827 });
1828 }
1829
1830 async fn run_until_first_tool_call(
1831 thread: &Entity<AcpThread>,
1832 cx: &mut TestAppContext,
1833 ) -> usize {
1834 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1835
1836 let subscription = cx.update(|cx| {
1837 cx.subscribe(thread, move |thread, _, cx| {
1838 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1839 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1840 return tx.try_send(ix).unwrap();
1841 }
1842 }
1843 })
1844 });
1845
1846 select! {
1847 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1848 panic!("Timeout waiting for tool call")
1849 }
1850 ix = rx.next().fuse() => {
1851 drop(subscription);
1852 ix.unwrap()
1853 }
1854 }
1855 }
1856
1857 pub fn fake_acp_thread(
1858 project: Entity<Project>,
1859 cx: &mut TestAppContext,
1860 ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1861 let (stdin_tx, stdin_rx) = async_pipe::pipe();
1862 let (stdout_tx, stdout_rx) = async_pipe::pipe();
1863
1864 let thread = cx.new(|cx| {
1865 let foreground_executor = cx.foreground_executor().clone();
1866 let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent(
1867 OldAcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
1868 stdin_tx,
1869 stdout_rx,
1870 move |fut| {
1871 foreground_executor.spawn(fut).detach();
1872 },
1873 );
1874
1875 let io_task = cx.background_spawn({
1876 async move {
1877 io_fut.await.log_err();
1878 Ok(())
1879 }
1880 });
1881 AcpThread::new(
1882 connection,
1883 "Test".into(),
1884 Some(io_task),
1885 project,
1886 acp::SessionId("test".into()),
1887 cx,
1888 )
1889 });
1890 let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1891 (thread, agent)
1892 }
1893
1894 pub struct FakeAcpServer {
1895 connection: acp_old::ClientConnection,
1896
1897 _io_task: Task<()>,
1898 on_user_message: Option<
1899 Rc<
1900 dyn Fn(
1901 acp_old::SendUserMessageParams,
1902 Entity<FakeAcpServer>,
1903 AsyncApp,
1904 ) -> LocalBoxFuture<'static, Result<(), acp_old::Error>>,
1905 >,
1906 >,
1907 }
1908
1909 #[derive(Clone)]
1910 struct FakeAgent {
1911 server: Entity<FakeAcpServer>,
1912 cx: AsyncApp,
1913 }
1914
1915 impl acp_old::Agent for FakeAgent {
1916 async fn initialize(
1917 &self,
1918 params: acp_old::InitializeParams,
1919 ) -> Result<acp_old::InitializeResponse, acp_old::Error> {
1920 Ok(acp_old::InitializeResponse {
1921 protocol_version: params.protocol_version,
1922 is_authenticated: true,
1923 })
1924 }
1925
1926 async fn authenticate(&self) -> Result<(), acp_old::Error> {
1927 Ok(())
1928 }
1929
1930 async fn cancel_send_message(&self) -> Result<(), acp_old::Error> {
1931 Ok(())
1932 }
1933
1934 async fn send_user_message(
1935 &self,
1936 request: acp_old::SendUserMessageParams,
1937 ) -> Result<(), acp_old::Error> {
1938 let mut cx = self.cx.clone();
1939 let handler = self
1940 .server
1941 .update(&mut cx, |server, _| server.on_user_message.clone())
1942 .ok()
1943 .flatten();
1944 if let Some(handler) = handler {
1945 handler(request, self.server.clone(), self.cx.clone()).await
1946 } else {
1947 Err(anyhow::anyhow!("No handler for on_user_message").into())
1948 }
1949 }
1950 }
1951
1952 impl FakeAcpServer {
1953 fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1954 let agent = FakeAgent {
1955 server: cx.entity(),
1956 cx: cx.to_async(),
1957 };
1958 let foreground_executor = cx.foreground_executor().clone();
1959
1960 let (connection, io_fut) = acp_old::ClientConnection::connect_to_client(
1961 agent.clone(),
1962 stdout,
1963 stdin,
1964 move |fut| {
1965 foreground_executor.spawn(fut).detach();
1966 },
1967 );
1968 FakeAcpServer {
1969 connection: connection,
1970 on_user_message: None,
1971 _io_task: cx.background_spawn(async move {
1972 io_fut.await.log_err();
1973 }),
1974 }
1975 }
1976
1977 fn on_user_message<F>(
1978 &mut self,
1979 handler: impl for<'a> Fn(
1980 acp_old::SendUserMessageParams,
1981 Entity<FakeAcpServer>,
1982 AsyncApp,
1983 ) -> F
1984 + 'static,
1985 ) where
1986 F: Future<Output = Result<(), acp_old::Error>> + 'static,
1987 {
1988 self.on_user_message
1989 .replace(Rc::new(move |request, server, cx| {
1990 handler(request, server, cx).boxed_local()
1991 }));
1992 }
1993
1994 fn send_to_zed<T: acp_old::ClientRequest + 'static>(
1995 &self,
1996 message: T,
1997 ) -> BoxedLocal<Result<T::Response>> {
1998 self.connection
1999 .request(message)
2000 .map(|f| f.map_err(|err| anyhow!(err)))
2001 .boxed_local()
2002 }
2003 }
2004}