1mod connection;
2pub use connection::*;
3
4use agent_client_protocol as acp;
5use agentic_coding_protocol as acp_old;
6use anyhow::{Context as _, Result};
7use assistant_tool::ActionLog;
8use buffer_diff::BufferDiff;
9use editor::{Bias, MultiBuffer, PathKey};
10use futures::{FutureExt, channel::oneshot, future::BoxFuture};
11use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
12use itertools::Itertools;
13use language::{
14 Anchor, Buffer, BufferSnapshot, Capability, LanguageRegistry, OffsetRangeExt as _, Point,
15 text_diff,
16};
17use markdown::Markdown;
18use project::{AgentLocation, Project};
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::error::Error;
22use std::fmt::Formatter;
23use std::rc::Rc;
24use std::{
25 fmt::Display,
26 mem,
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use ui::App;
31use util::ResultExt;
32
33#[derive(Debug)]
34pub struct UserMessage {
35 pub content: ContentBlock,
36}
37
38impl UserMessage {
39 pub fn from_acp(
40 message: impl IntoIterator<Item = acp::ContentBlock>,
41 language_registry: Arc<LanguageRegistry>,
42 cx: &mut App,
43 ) -> Self {
44 let mut content = ContentBlock::Empty;
45 for chunk in message {
46 content.append(chunk, &language_registry, cx)
47 }
48 Self { content: content }
49 }
50
51 fn to_markdown(&self, cx: &App) -> String {
52 format!("## User\n\n{}\n\n", self.content.to_markdown(cx))
53 }
54}
55
56#[derive(Debug)]
57pub struct MentionPath<'a>(&'a Path);
58
59impl<'a> MentionPath<'a> {
60 const PREFIX: &'static str = "@file:";
61
62 pub fn new(path: &'a Path) -> Self {
63 MentionPath(path)
64 }
65
66 pub fn try_parse(url: &'a str) -> Option<Self> {
67 let path = url.strip_prefix(Self::PREFIX)?;
68 Some(MentionPath(Path::new(path)))
69 }
70
71 pub fn path(&self) -> &Path {
72 self.0
73 }
74}
75
76impl Display for MentionPath<'_> {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "[@{}]({}{})",
81 self.0.file_name().unwrap_or_default().display(),
82 Self::PREFIX,
83 self.0.display()
84 )
85 }
86}
87
88#[derive(Debug, PartialEq)]
89pub struct AssistantMessage {
90 pub chunks: Vec<AssistantMessageChunk>,
91}
92
93impl AssistantMessage {
94 pub fn to_markdown(&self, cx: &App) -> String {
95 format!(
96 "## Assistant\n\n{}\n\n",
97 self.chunks
98 .iter()
99 .map(|chunk| chunk.to_markdown(cx))
100 .join("\n\n")
101 )
102 }
103}
104
105#[derive(Debug, PartialEq)]
106pub enum AssistantMessageChunk {
107 Message { block: ContentBlock },
108 Thought { block: ContentBlock },
109}
110
111impl AssistantMessageChunk {
112 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
113 Self::Message {
114 block: ContentBlock::new(chunk.into(), language_registry, cx),
115 }
116 }
117
118 fn to_markdown(&self, cx: &App) -> String {
119 match self {
120 Self::Message { block } => block.to_markdown(cx).to_string(),
121 Self::Thought { block } => {
122 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
123 }
124 }
125 }
126}
127
128#[derive(Debug)]
129pub enum AgentThreadEntry {
130 UserMessage(UserMessage),
131 AssistantMessage(AssistantMessage),
132 ToolCall(ToolCall),
133}
134
135impl AgentThreadEntry {
136 fn to_markdown(&self, cx: &App) -> String {
137 match self {
138 Self::UserMessage(message) => message.to_markdown(cx),
139 Self::AssistantMessage(message) => message.to_markdown(cx),
140 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
141 }
142 }
143
144 pub fn diffs(&self) -> impl Iterator<Item = &Diff> {
145 if let AgentThreadEntry::ToolCall(call) = self {
146 itertools::Either::Left(call.diffs())
147 } else {
148 itertools::Either::Right(std::iter::empty())
149 }
150 }
151
152 pub fn locations(&self) -> Option<&[acp::ToolCallLocation]> {
153 if let AgentThreadEntry::ToolCall(ToolCall { locations, .. }) = self {
154 Some(locations)
155 } else {
156 None
157 }
158 }
159}
160
161#[derive(Debug)]
162pub struct ToolCall {
163 pub id: acp::ToolCallId,
164 pub label: Entity<Markdown>,
165 pub kind: acp::ToolKind,
166 pub content: Vec<ToolCallContent>,
167 pub status: ToolCallStatus,
168 pub locations: Vec<acp::ToolCallLocation>,
169}
170
171impl ToolCall {
172 fn from_acp(
173 tool_call: acp::ToolCall,
174 status: ToolCallStatus,
175 language_registry: Arc<LanguageRegistry>,
176 cx: &mut App,
177 ) -> Self {
178 Self {
179 id: tool_call.id,
180 label: cx.new(|cx| {
181 Markdown::new(
182 tool_call.label.into(),
183 Some(language_registry.clone()),
184 None,
185 cx,
186 )
187 }),
188 kind: tool_call.kind,
189 content: tool_call
190 .content
191 .into_iter()
192 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
193 .collect(),
194 locations: tool_call.locations,
195 status,
196 }
197 }
198
199 pub fn diffs(&self) -> impl Iterator<Item = &Diff> {
200 self.content.iter().filter_map(|content| match content {
201 ToolCallContent::ContentBlock { .. } => None,
202 ToolCallContent::Diff { diff } => Some(diff),
203 })
204 }
205
206 fn to_markdown(&self, cx: &App) -> String {
207 let mut markdown = format!(
208 "**Tool Call: {}**\nStatus: {}\n\n",
209 self.label.read(cx).source(),
210 self.status
211 );
212 for content in &self.content {
213 markdown.push_str(content.to_markdown(cx).as_str());
214 markdown.push_str("\n\n");
215 }
216 markdown
217 }
218}
219
220#[derive(Debug)]
221pub enum ToolCallStatus {
222 WaitingForConfirmation {
223 options: Vec<acp::PermissionOption>,
224 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
225 },
226 Allowed {
227 status: acp::ToolCallStatus,
228 },
229 Rejected,
230 Canceled,
231}
232
233impl Display for ToolCallStatus {
234 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
235 write!(
236 f,
237 "{}",
238 match self {
239 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
240 ToolCallStatus::Allowed { status } => match status {
241 acp::ToolCallStatus::InProgress => "In Progress",
242 acp::ToolCallStatus::Completed => "Completed",
243 acp::ToolCallStatus::Failed => "Failed",
244 },
245 ToolCallStatus::Rejected => "Rejected",
246 ToolCallStatus::Canceled => "Canceled",
247 }
248 )
249 }
250}
251
252#[derive(Debug, PartialEq, Clone)]
253pub enum ContentBlock {
254 Empty,
255 Markdown { markdown: Entity<Markdown> },
256}
257
258impl ContentBlock {
259 pub fn new(
260 block: acp::ContentBlock,
261 language_registry: &Arc<LanguageRegistry>,
262 cx: &mut App,
263 ) -> Self {
264 let mut this = Self::Empty;
265 this.append(block, language_registry, cx);
266 this
267 }
268
269 pub fn new_combined(
270 blocks: impl IntoIterator<Item = acp::ContentBlock>,
271 language_registry: Arc<LanguageRegistry>,
272 cx: &mut App,
273 ) -> Self {
274 let mut this = Self::Empty;
275 for block in blocks {
276 this.append(block, &language_registry, cx);
277 }
278 this
279 }
280
281 pub fn append(
282 &mut self,
283 block: acp::ContentBlock,
284 language_registry: &Arc<LanguageRegistry>,
285 cx: &mut App,
286 ) {
287 let new_content = match block {
288 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
289 acp::ContentBlock::ResourceLink(resource_link) => {
290 if let Some(path) = resource_link.uri.strip_prefix("file://") {
291 format!("{}", MentionPath(path.as_ref()))
292 } else {
293 resource_link.uri.clone()
294 }
295 }
296 acp::ContentBlock::Image(_)
297 | acp::ContentBlock::Audio(_)
298 | acp::ContentBlock::Resource(_) => String::new(),
299 };
300
301 match self {
302 ContentBlock::Empty => {
303 *self = ContentBlock::Markdown {
304 markdown: cx.new(|cx| {
305 Markdown::new(
306 new_content.into(),
307 Some(language_registry.clone()),
308 None,
309 cx,
310 )
311 }),
312 };
313 }
314 ContentBlock::Markdown { markdown } => {
315 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
316 }
317 }
318 }
319
320 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
321 match self {
322 ContentBlock::Empty => "",
323 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
324 }
325 }
326
327 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
328 match self {
329 ContentBlock::Empty => None,
330 ContentBlock::Markdown { markdown } => Some(markdown),
331 }
332 }
333}
334
335#[derive(Debug)]
336pub enum ToolCallContent {
337 ContentBlock { content: ContentBlock },
338 Diff { diff: Diff },
339}
340
341impl ToolCallContent {
342 pub fn from_acp(
343 content: acp::ToolCallContent,
344 language_registry: Arc<LanguageRegistry>,
345 cx: &mut App,
346 ) -> Self {
347 match content {
348 acp::ToolCallContent::ContentBlock { content } => Self::ContentBlock {
349 content: ContentBlock::new(content, &language_registry, cx),
350 },
351 acp::ToolCallContent::Diff { diff } => Self::Diff {
352 diff: Diff::from_acp(diff, language_registry, cx),
353 },
354 }
355 }
356
357 pub fn to_markdown(&self, cx: &App) -> String {
358 match self {
359 Self::ContentBlock { content } => content.to_markdown(cx).to_string(),
360 Self::Diff { diff } => diff.to_markdown(cx),
361 }
362 }
363}
364
365#[derive(Debug)]
366pub struct Diff {
367 pub multibuffer: Entity<MultiBuffer>,
368 pub path: PathBuf,
369 pub new_buffer: Entity<Buffer>,
370 pub old_buffer: Entity<Buffer>,
371 _task: Task<Result<()>>,
372}
373
374impl Diff {
375 pub fn from_acp(
376 diff: acp::Diff,
377 language_registry: Arc<LanguageRegistry>,
378 cx: &mut App,
379 ) -> Self {
380 let acp::Diff {
381 path,
382 old_text,
383 new_text,
384 } = diff;
385
386 let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
387
388 let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
389 let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
390 let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
391 let old_buffer_snapshot = old_buffer.read(cx).snapshot();
392 let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
393 let diff_task = buffer_diff.update(cx, |diff, cx| {
394 diff.set_base_text(
395 old_buffer_snapshot,
396 Some(language_registry.clone()),
397 new_buffer_snapshot,
398 cx,
399 )
400 });
401
402 let task = cx.spawn({
403 let multibuffer = multibuffer.clone();
404 let path = path.clone();
405 let new_buffer = new_buffer.clone();
406 async move |cx| {
407 diff_task.await?;
408
409 multibuffer
410 .update(cx, |multibuffer, cx| {
411 let hunk_ranges = {
412 let buffer = new_buffer.read(cx);
413 let diff = buffer_diff.read(cx);
414 diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
415 .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
416 .collect::<Vec<_>>()
417 };
418
419 multibuffer.set_excerpts_for_path(
420 PathKey::for_buffer(&new_buffer, cx),
421 new_buffer.clone(),
422 hunk_ranges,
423 editor::DEFAULT_MULTIBUFFER_CONTEXT,
424 cx,
425 );
426 multibuffer.add_diff(buffer_diff.clone(), cx);
427 })
428 .log_err();
429
430 if let Some(language) = language_registry
431 .language_for_file_path(&path)
432 .await
433 .log_err()
434 {
435 new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
436 }
437
438 anyhow::Ok(())
439 }
440 });
441
442 Self {
443 multibuffer,
444 path,
445 new_buffer,
446 old_buffer,
447 _task: task,
448 }
449 }
450
451 fn to_markdown(&self, cx: &App) -> String {
452 let buffer_text = self
453 .multibuffer
454 .read(cx)
455 .all_buffers()
456 .iter()
457 .map(|buffer| buffer.read(cx).text())
458 .join("\n");
459 format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
460 }
461}
462
463#[derive(Debug, Default)]
464pub struct Plan {
465 pub entries: Vec<PlanEntry>,
466}
467
468#[derive(Debug)]
469pub struct PlanStats<'a> {
470 pub in_progress_entry: Option<&'a PlanEntry>,
471 pub pending: u32,
472 pub completed: u32,
473}
474
475impl Plan {
476 pub fn is_empty(&self) -> bool {
477 self.entries.is_empty()
478 }
479
480 pub fn stats(&self) -> PlanStats<'_> {
481 let mut stats = PlanStats {
482 in_progress_entry: None,
483 pending: 0,
484 completed: 0,
485 };
486
487 for entry in &self.entries {
488 match &entry.status {
489 acp::PlanEntryStatus::Pending => {
490 stats.pending += 1;
491 }
492 acp::PlanEntryStatus::InProgress => {
493 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
494 }
495 acp::PlanEntryStatus::Completed => {
496 stats.completed += 1;
497 }
498 }
499 }
500
501 stats
502 }
503}
504
505#[derive(Debug)]
506pub struct PlanEntry {
507 pub content: Entity<Markdown>,
508 pub priority: acp::PlanEntryPriority,
509 pub status: acp::PlanEntryStatus,
510}
511
512impl PlanEntry {
513 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
514 Self {
515 content: cx.new(|cx| Markdown::new_text(entry.content.into(), cx)),
516 priority: entry.priority,
517 status: entry.status,
518 }
519 }
520}
521
522pub struct AcpThread {
523 title: SharedString,
524 entries: Vec<AgentThreadEntry>,
525 plan: Plan,
526 project: Entity<Project>,
527 action_log: Entity<ActionLog>,
528 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
529 send_task: Option<Task<()>>,
530 connection: Rc<dyn AgentConnection>,
531 session_id: acp::SessionId,
532}
533
534pub enum AcpThreadEvent {
535 NewEntry,
536 EntryUpdated(usize),
537}
538
539impl EventEmitter<AcpThreadEvent> for AcpThread {}
540
541#[derive(PartialEq, Eq)]
542pub enum ThreadStatus {
543 Idle,
544 WaitingForToolConfirmation,
545 Generating,
546}
547
548#[derive(Debug, Clone)]
549pub enum LoadError {
550 Unsupported {
551 error_message: SharedString,
552 upgrade_message: SharedString,
553 upgrade_command: String,
554 },
555 Exited(i32),
556 Other(SharedString),
557}
558
559impl Display for LoadError {
560 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
561 match self {
562 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
563 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
564 LoadError::Other(msg) => write!(f, "{}", msg),
565 }
566 }
567}
568
569impl Error for LoadError {}
570
571impl AcpThread {
572 pub fn new(
573 connection: Rc<dyn AgentConnection>,
574 project: Entity<Project>,
575 session_id: acp::SessionId,
576 cx: &mut Context<Self>,
577 ) -> Self {
578 let action_log = cx.new(|_| ActionLog::new(project.clone()));
579
580 Self {
581 action_log,
582 shared_buffers: Default::default(),
583 entries: Default::default(),
584 plan: Default::default(),
585 title: connection.name().into(),
586 project,
587 send_task: None,
588 connection,
589 session_id,
590 }
591 }
592
593 pub fn action_log(&self) -> &Entity<ActionLog> {
594 &self.action_log
595 }
596
597 pub fn project(&self) -> &Entity<Project> {
598 &self.project
599 }
600
601 pub fn title(&self) -> SharedString {
602 self.title.clone()
603 }
604
605 pub fn entries(&self) -> &[AgentThreadEntry] {
606 &self.entries
607 }
608
609 pub fn status(&self) -> ThreadStatus {
610 if self.send_task.is_some() {
611 if self.waiting_for_tool_confirmation() {
612 ThreadStatus::WaitingForToolConfirmation
613 } else {
614 ThreadStatus::Generating
615 }
616 } else {
617 ThreadStatus::Idle
618 }
619 }
620
621 pub fn has_pending_edit_tool_calls(&self) -> bool {
622 for entry in self.entries.iter().rev() {
623 match entry {
624 AgentThreadEntry::UserMessage(_) => return false,
625 AgentThreadEntry::ToolCall(call) if call.diffs().next().is_some() => return true,
626 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
627 }
628 }
629
630 false
631 }
632
633 pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
634 self.entries.push(entry);
635 cx.emit(AcpThreadEvent::NewEntry);
636 }
637
638 pub fn push_assistant_chunk(
639 &mut self,
640 chunk: acp::ContentBlock,
641 is_thought: bool,
642 cx: &mut Context<Self>,
643 ) {
644 let language_registry = self.project.read(cx).languages().clone();
645 let entries_len = self.entries.len();
646 if let Some(last_entry) = self.entries.last_mut()
647 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
648 {
649 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
650 match (chunks.last_mut(), is_thought) {
651 (Some(AssistantMessageChunk::Message { block }), false)
652 | (Some(AssistantMessageChunk::Thought { block }), true) => {
653 block.append(chunk, &language_registry, cx)
654 }
655 _ => {
656 let block = ContentBlock::new(chunk, &language_registry, cx);
657 if is_thought {
658 chunks.push(AssistantMessageChunk::Thought { block })
659 } else {
660 chunks.push(AssistantMessageChunk::Message { block })
661 }
662 }
663 }
664 } else {
665 let block = ContentBlock::new(chunk, &language_registry, cx);
666 let chunk = if is_thought {
667 AssistantMessageChunk::Thought { block }
668 } else {
669 AssistantMessageChunk::Message { block }
670 };
671
672 self.push_entry(
673 AgentThreadEntry::AssistantMessage(AssistantMessage {
674 chunks: vec![chunk],
675 }),
676 cx,
677 );
678 }
679 }
680
681 pub fn update_tool_call(
682 &mut self,
683 id: acp::ToolCallId,
684 status: acp::ToolCallStatus,
685 content: Option<Vec<acp::ToolCallContent>>,
686 cx: &mut Context<Self>,
687 ) -> Result<()> {
688 let languages = self.project.read(cx).languages().clone();
689 let (ix, current_call) = self.tool_call_mut(&id).context("Tool call not found")?;
690
691 if let Some(content) = content {
692 current_call.content = content
693 .into_iter()
694 .map(|chunk| ToolCallContent::from_acp(chunk, languages.clone(), cx))
695 .collect();
696 }
697 current_call.status = ToolCallStatus::Allowed { status };
698
699 cx.emit(AcpThreadEvent::EntryUpdated(ix));
700
701 Ok(())
702 }
703
704 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
705 pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
706 let status = ToolCallStatus::Allowed {
707 status: tool_call.status,
708 };
709 self.upsert_tool_call_inner(tool_call, status, cx)
710 }
711
712 pub fn upsert_tool_call_inner(
713 &mut self,
714 tool_call: acp::ToolCall,
715 status: ToolCallStatus,
716 cx: &mut Context<Self>,
717 ) {
718 let language_registry = self.project.read(cx).languages().clone();
719 let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
720
721 let location = call.locations.last().cloned();
722
723 if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
724 *current_call = call;
725
726 cx.emit(AcpThreadEvent::EntryUpdated(ix));
727 } else {
728 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
729 }
730
731 if let Some(location) = location {
732 self.set_project_location(location, cx)
733 }
734 }
735
736 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
737 // todo! use map
738 self.entries
739 .iter_mut()
740 .enumerate()
741 .rev()
742 .find_map(|(index, tool_call)| {
743 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
744 && &tool_call.id == id
745 {
746 Some((index, tool_call))
747 } else {
748 None
749 }
750 })
751 }
752
753 pub fn request_tool_call_permission(
754 &mut self,
755 tool_call: acp::ToolCall,
756 options: Vec<acp::PermissionOption>,
757 cx: &mut Context<Self>,
758 ) -> oneshot::Receiver<acp::PermissionOptionId> {
759 let (tx, rx) = oneshot::channel();
760
761 let status = ToolCallStatus::WaitingForConfirmation {
762 options,
763 respond_tx: tx,
764 };
765
766 self.upsert_tool_call_inner(tool_call, status, cx);
767 rx
768 }
769
770 pub fn authorize_tool_call(
771 &mut self,
772 id: acp::ToolCallId,
773 option_id: acp::PermissionOptionId,
774 option_kind: acp::PermissionOptionKind,
775 cx: &mut Context<Self>,
776 ) {
777 let Some((ix, call)) = self.tool_call_mut(&id) else {
778 return;
779 };
780
781 let new_status = match option_kind {
782 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
783 ToolCallStatus::Rejected
784 }
785 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
786 ToolCallStatus::Allowed {
787 status: acp::ToolCallStatus::InProgress,
788 }
789 }
790 };
791
792 let curr_status = mem::replace(&mut call.status, new_status);
793
794 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
795 respond_tx.send(option_id).log_err();
796 } else if cfg!(debug_assertions) {
797 panic!("tried to authorize an already authorized tool call");
798 }
799
800 cx.emit(AcpThreadEvent::EntryUpdated(ix));
801 }
802
803 pub fn plan(&self) -> &Plan {
804 &self.plan
805 }
806
807 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
808 self.plan = Plan {
809 entries: request
810 .entries
811 .into_iter()
812 .map(|entry| PlanEntry::from_acp(entry, cx))
813 .collect(),
814 };
815
816 cx.notify();
817 }
818
819 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
820 self.plan
821 .entries
822 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
823 cx.notify();
824 }
825
826 pub fn set_project_location(&self, location: acp::ToolCallLocation, cx: &mut Context<Self>) {
827 self.project.update(cx, |project, cx| {
828 let Some(path) = project.project_path_for_absolute_path(&location.path, cx) else {
829 return;
830 };
831 let buffer = project.open_buffer(path, cx);
832 cx.spawn(async move |project, cx| {
833 let buffer = buffer.await?;
834
835 project.update(cx, |project, cx| {
836 let position = if let Some(line) = location.line {
837 let snapshot = buffer.read(cx).snapshot();
838 let point = snapshot.clip_point(Point::new(line, 0), Bias::Left);
839 snapshot.anchor_before(point)
840 } else {
841 Anchor::MIN
842 };
843
844 project.set_agent_location(
845 Some(AgentLocation {
846 buffer: buffer.downgrade(),
847 position,
848 }),
849 cx,
850 );
851 })
852 })
853 .detach_and_log_err(cx);
854 });
855 }
856
857 /// Returns true if the last turn is awaiting tool authorization
858 pub fn waiting_for_tool_confirmation(&self) -> bool {
859 for entry in self.entries.iter().rev() {
860 match &entry {
861 AgentThreadEntry::ToolCall(call) => match call.status {
862 ToolCallStatus::WaitingForConfirmation { .. } => return true,
863 ToolCallStatus::Allowed { .. }
864 | ToolCallStatus::Rejected
865 | ToolCallStatus::Canceled => continue,
866 },
867 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
868 // Reached the beginning of the turn
869 return false;
870 }
871 }
872 }
873 false
874 }
875
876 pub fn authenticate(&self, cx: &mut App) -> impl use<> + Future<Output = Result<()>> {
877 self.connection.authenticate(cx)
878 }
879
880 #[cfg(any(test, feature = "test-support"))]
881 pub fn send_raw(
882 &mut self,
883 message: &str,
884 cx: &mut Context<Self>,
885 ) -> BoxFuture<'static, Result<()>> {
886 self.send(
887 vec![acp::ContentBlock::Text(acp::TextContent {
888 text: message.to_string(),
889 annotations: None,
890 })],
891 cx,
892 )
893 }
894
895 pub fn send(
896 &mut self,
897 message: Vec<acp::ContentBlock>,
898 cx: &mut Context<Self>,
899 ) -> BoxFuture<'static, Result<()>> {
900 let block = ContentBlock::new_combined(
901 message.clone(),
902 self.project.read(cx).languages().clone(),
903 cx,
904 );
905 self.push_entry(
906 AgentThreadEntry::UserMessage(UserMessage { content: block }),
907 cx,
908 );
909 self.clear_completed_plan_entries(cx);
910
911 let (tx, rx) = oneshot::channel();
912 let cancel_task = self.cancel(cx);
913
914 self.send_task = Some(cx.spawn(async move |this, cx| {
915 async {
916 cancel_task.await;
917
918 let result = this
919 .update(cx, |this, cx| {
920 this.connection.prompt(
921 acp::PromptToolArguments {
922 prompt: message,
923 session_id: this.session_id.clone(),
924 },
925 cx,
926 )
927 })?
928 .await;
929 tx.send(result).log_err();
930 this.update(cx, |this, _cx| this.send_task.take())?;
931 anyhow::Ok(())
932 }
933 .await
934 .log_err();
935 }));
936
937 async move {
938 match rx.await {
939 Ok(Err(e)) => Err(e)?,
940 _ => Ok(()),
941 }
942 }
943 .boxed()
944 }
945
946 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
947 let Some(send_task) = self.send_task.take() else {
948 return Task::ready(());
949 };
950
951 for entry in self.entries.iter_mut() {
952 if let AgentThreadEntry::ToolCall(call) = entry {
953 let cancel = matches!(
954 call.status,
955 ToolCallStatus::WaitingForConfirmation { .. }
956 | ToolCallStatus::Allowed {
957 status: acp::ToolCallStatus::InProgress
958 }
959 );
960
961 if cancel {
962 call.status = ToolCallStatus::Canceled;
963 }
964 }
965 }
966
967 self.connection.cancel(&self.session_id, cx);
968
969 // Wait for the send task to complete
970 cx.foreground_executor().spawn(send_task)
971 }
972
973 pub fn read_text_file(
974 &self,
975 path: PathBuf,
976 line: Option<u32>,
977 limit: Option<u32>,
978 reuse_shared_snapshot: bool,
979 cx: &mut Context<Self>,
980 ) -> Task<Result<String>> {
981 let project = self.project.clone();
982 let action_log = self.action_log.clone();
983 cx.spawn(async move |this, cx| {
984 let load = project.update(cx, |project, cx| {
985 let path = project
986 .project_path_for_absolute_path(&path, cx)
987 .context("invalid path")?;
988 anyhow::Ok(project.open_buffer(path, cx))
989 });
990 let buffer = load??.await?;
991
992 let snapshot = if reuse_shared_snapshot {
993 this.read_with(cx, |this, _| {
994 this.shared_buffers.get(&buffer.clone()).cloned()
995 })
996 .log_err()
997 .flatten()
998 } else {
999 None
1000 };
1001
1002 let snapshot = if let Some(snapshot) = snapshot {
1003 snapshot
1004 } else {
1005 action_log.update(cx, |action_log, cx| {
1006 action_log.buffer_read(buffer.clone(), cx);
1007 })?;
1008 project.update(cx, |project, cx| {
1009 let position = buffer
1010 .read(cx)
1011 .snapshot()
1012 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1013 project.set_agent_location(
1014 Some(AgentLocation {
1015 buffer: buffer.downgrade(),
1016 position,
1017 }),
1018 cx,
1019 );
1020 })?;
1021
1022 buffer.update(cx, |buffer, _| buffer.snapshot())?
1023 };
1024
1025 this.update(cx, |this, _| {
1026 let text = snapshot.text();
1027 this.shared_buffers.insert(buffer.clone(), snapshot);
1028 if line.is_none() && limit.is_none() {
1029 return Ok(text);
1030 }
1031 let limit = limit.unwrap_or(u32::MAX) as usize;
1032 let Some(line) = line else {
1033 return Ok(text.lines().take(limit).collect::<String>());
1034 };
1035
1036 let count = text.lines().count();
1037 if count < line as usize {
1038 anyhow::bail!("There are only {} lines", count);
1039 }
1040 Ok(text
1041 .lines()
1042 .skip(line as usize + 1)
1043 .take(limit)
1044 .collect::<String>())
1045 })?
1046 })
1047 }
1048
1049 pub fn write_text_file(
1050 &self,
1051 path: PathBuf,
1052 content: String,
1053 cx: &mut Context<Self>,
1054 ) -> Task<Result<()>> {
1055 let project = self.project.clone();
1056 let action_log = self.action_log.clone();
1057 cx.spawn(async move |this, cx| {
1058 let load = project.update(cx, |project, cx| {
1059 let path = project
1060 .project_path_for_absolute_path(&path, cx)
1061 .context("invalid path")?;
1062 anyhow::Ok(project.open_buffer(path, cx))
1063 });
1064 let buffer = load??.await?;
1065 let snapshot = this.update(cx, |this, cx| {
1066 this.shared_buffers
1067 .get(&buffer)
1068 .cloned()
1069 .unwrap_or_else(|| buffer.read(cx).snapshot())
1070 })?;
1071 let edits = cx
1072 .background_executor()
1073 .spawn(async move {
1074 let old_text = snapshot.text();
1075 text_diff(old_text.as_str(), &content)
1076 .into_iter()
1077 .map(|(range, replacement)| {
1078 (
1079 snapshot.anchor_after(range.start)
1080 ..snapshot.anchor_before(range.end),
1081 replacement,
1082 )
1083 })
1084 .collect::<Vec<_>>()
1085 })
1086 .await;
1087 cx.update(|cx| {
1088 project.update(cx, |project, cx| {
1089 project.set_agent_location(
1090 Some(AgentLocation {
1091 buffer: buffer.downgrade(),
1092 position: edits
1093 .last()
1094 .map(|(range, _)| range.end)
1095 .unwrap_or(Anchor::MIN),
1096 }),
1097 cx,
1098 );
1099 });
1100
1101 action_log.update(cx, |action_log, cx| {
1102 action_log.buffer_read(buffer.clone(), cx);
1103 });
1104 buffer.update(cx, |buffer, cx| {
1105 buffer.edit(edits, None, cx);
1106 });
1107 action_log.update(cx, |action_log, cx| {
1108 action_log.buffer_edited(buffer.clone(), cx);
1109 });
1110 })?;
1111 project
1112 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1113 .await
1114 })
1115 }
1116
1117 pub fn to_markdown(&self, cx: &App) -> String {
1118 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1119 }
1120}
1121
1122#[derive(Clone)]
1123pub struct OldAcpClientDelegate {
1124 thread: Rc<RefCell<WeakEntity<AcpThread>>>,
1125 cx: AsyncApp,
1126 next_tool_call_id: Rc<RefCell<u64>>,
1127 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
1128}
1129
1130impl OldAcpClientDelegate {
1131 pub fn new(thread: Rc<RefCell<WeakEntity<AcpThread>>>, cx: AsyncApp) -> Self {
1132 Self {
1133 thread,
1134 cx,
1135 next_tool_call_id: Rc::new(RefCell::new(0)),
1136 }
1137 }
1138}
1139
1140impl acp_old::Client for OldAcpClientDelegate {
1141 async fn stream_assistant_message_chunk(
1142 &self,
1143 params: acp_old::StreamAssistantMessageChunkParams,
1144 ) -> Result<(), acp_old::Error> {
1145 let cx = &mut self.cx.clone();
1146
1147 cx.update(|cx| {
1148 self.thread
1149 .borrow()
1150 .update(cx, |thread, cx| match params.chunk {
1151 acp_old::AssistantMessageChunk::Text { text } => {
1152 thread.push_assistant_chunk(text.into(), false, cx)
1153 }
1154 acp_old::AssistantMessageChunk::Thought { thought } => {
1155 thread.push_assistant_chunk(thought.into(), true, cx)
1156 }
1157 })
1158 .ok();
1159 })?;
1160
1161 Ok(())
1162 }
1163
1164 async fn request_tool_call_confirmation(
1165 &self,
1166 request: acp_old::RequestToolCallConfirmationParams,
1167 ) -> Result<acp_old::RequestToolCallConfirmationResponse, acp_old::Error> {
1168 let cx = &mut self.cx.clone();
1169
1170 let old_acp_id = *self.next_tool_call_id.borrow() + 1;
1171 self.next_tool_call_id.replace(old_acp_id);
1172
1173 let tool_call = into_new_tool_call(
1174 acp::ToolCallId(old_acp_id.to_string().into()),
1175 request.tool_call,
1176 );
1177
1178 let mut options = match request.confirmation {
1179 acp_old::ToolCallConfirmation::Edit { .. } => vec![(
1180 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1181 acp::PermissionOptionKind::AllowAlways,
1182 "Always Allow Edits".to_string(),
1183 )],
1184 acp_old::ToolCallConfirmation::Execute { root_command, .. } => vec![(
1185 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1186 acp::PermissionOptionKind::AllowAlways,
1187 format!("Always Allow {}", root_command),
1188 )],
1189 acp_old::ToolCallConfirmation::Mcp {
1190 server_name,
1191 tool_name,
1192 ..
1193 } => vec![
1194 (
1195 acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer,
1196 acp::PermissionOptionKind::AllowAlways,
1197 format!("Always Allow {}", server_name),
1198 ),
1199 (
1200 acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool,
1201 acp::PermissionOptionKind::AllowAlways,
1202 format!("Always Allow {}", tool_name),
1203 ),
1204 ],
1205 acp_old::ToolCallConfirmation::Fetch { .. } => vec![(
1206 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1207 acp::PermissionOptionKind::AllowAlways,
1208 "Always Allow".to_string(),
1209 )],
1210 acp_old::ToolCallConfirmation::Other { .. } => vec![(
1211 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1212 acp::PermissionOptionKind::AllowAlways,
1213 "Always Allow".to_string(),
1214 )],
1215 };
1216
1217 options.extend([
1218 (
1219 acp_old::ToolCallConfirmationOutcome::Allow,
1220 acp::PermissionOptionKind::AllowOnce,
1221 "Allow".to_string(),
1222 ),
1223 (
1224 acp_old::ToolCallConfirmationOutcome::Reject,
1225 acp::PermissionOptionKind::RejectOnce,
1226 "Reject".to_string(),
1227 ),
1228 ]);
1229
1230 let mut outcomes = Vec::with_capacity(options.len());
1231 let mut acp_options = Vec::with_capacity(options.len());
1232
1233 for (index, (outcome, kind, label)) in options.into_iter().enumerate() {
1234 outcomes.push(outcome);
1235 acp_options.push(acp::PermissionOption {
1236 id: acp::PermissionOptionId(index.to_string().into()),
1237 label,
1238 kind,
1239 })
1240 }
1241
1242 let response = cx
1243 .update(|cx| {
1244 self.thread.borrow().update(cx, |thread, cx| {
1245 thread.request_tool_call_permission(tool_call, acp_options, cx)
1246 })
1247 })?
1248 .context("Failed to update thread")?
1249 .await;
1250
1251 let outcome = match response {
1252 Ok(option_id) => outcomes[option_id.0.parse::<usize>().unwrap_or(0)],
1253 Err(oneshot::Canceled) => acp_old::ToolCallConfirmationOutcome::Cancel,
1254 };
1255
1256 Ok(acp_old::RequestToolCallConfirmationResponse {
1257 id: acp_old::ToolCallId(old_acp_id),
1258 outcome: outcome,
1259 })
1260 }
1261
1262 async fn push_tool_call(
1263 &self,
1264 request: acp_old::PushToolCallParams,
1265 ) -> Result<acp_old::PushToolCallResponse, acp_old::Error> {
1266 let cx = &mut self.cx.clone();
1267
1268 let old_acp_id = *self.next_tool_call_id.borrow() + 1;
1269 self.next_tool_call_id.replace(old_acp_id);
1270
1271 cx.update(|cx| {
1272 self.thread.borrow().update(cx, |thread, cx| {
1273 thread.upsert_tool_call(
1274 into_new_tool_call(acp::ToolCallId(old_acp_id.to_string().into()), request),
1275 cx,
1276 )
1277 })
1278 })?
1279 .context("Failed to update thread")?;
1280
1281 Ok(acp_old::PushToolCallResponse {
1282 id: acp_old::ToolCallId(old_acp_id),
1283 })
1284 }
1285
1286 async fn update_tool_call(
1287 &self,
1288 request: acp_old::UpdateToolCallParams,
1289 ) -> Result<(), acp_old::Error> {
1290 let cx = &mut self.cx.clone();
1291
1292 cx.update(|cx| {
1293 self.thread.borrow().update(cx, |thread, cx| {
1294 let languages = thread.project.read(cx).languages().clone();
1295
1296 if let Some((ix, tool_call)) = thread
1297 .tool_call_mut(&acp::ToolCallId(request.tool_call_id.0.to_string().into()))
1298 {
1299 tool_call.status = ToolCallStatus::Allowed {
1300 status: into_new_tool_call_status(request.status),
1301 };
1302 tool_call.content = request
1303 .content
1304 .into_iter()
1305 .map(|content| {
1306 ToolCallContent::from_acp(
1307 into_new_tool_call_content(content),
1308 languages.clone(),
1309 cx,
1310 )
1311 })
1312 .collect();
1313
1314 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1315 anyhow::Ok(())
1316 } else {
1317 anyhow::bail!("Tool call not found")
1318 }
1319 })
1320 })?
1321 .context("Failed to update thread")??;
1322
1323 Ok(())
1324 }
1325
1326 async fn update_plan(&self, request: acp_old::UpdatePlanParams) -> Result<(), acp_old::Error> {
1327 let cx = &mut self.cx.clone();
1328
1329 cx.update(|cx| {
1330 self.thread.borrow().update(cx, |thread, cx| {
1331 thread.update_plan(
1332 acp::Plan {
1333 entries: request
1334 .entries
1335 .into_iter()
1336 .map(into_new_plan_entry)
1337 .collect(),
1338 },
1339 cx,
1340 )
1341 })
1342 })?
1343 .context("Failed to update thread")?;
1344
1345 Ok(())
1346 }
1347
1348 async fn read_text_file(
1349 &self,
1350 acp_old::ReadTextFileParams { path, line, limit }: acp_old::ReadTextFileParams,
1351 ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1352 let content = self
1353 .cx
1354 .update(|cx| {
1355 self.thread.borrow().update(cx, |thread, cx| {
1356 thread.read_text_file(path, line, limit, false, cx)
1357 })
1358 })?
1359 .context("Failed to update thread")?
1360 .await?;
1361 Ok(acp_old::ReadTextFileResponse { content })
1362 }
1363
1364 async fn write_text_file(
1365 &self,
1366 acp_old::WriteTextFileParams { path, content }: acp_old::WriteTextFileParams,
1367 ) -> Result<(), acp_old::Error> {
1368 self.cx
1369 .update(|cx| {
1370 self.thread
1371 .borrow()
1372 .update(cx, |thread, cx| thread.write_text_file(path, content, cx))
1373 })?
1374 .context("Failed to update thread")?
1375 .await?;
1376
1377 Ok(())
1378 }
1379}
1380
1381fn into_new_tool_call(id: acp::ToolCallId, request: acp_old::PushToolCallParams) -> acp::ToolCall {
1382 acp::ToolCall {
1383 id: id,
1384 label: request.label,
1385 kind: acp_kind_from_old_icon(request.icon),
1386 status: acp::ToolCallStatus::InProgress,
1387 content: request
1388 .content
1389 .into_iter()
1390 .map(into_new_tool_call_content)
1391 .collect(),
1392 locations: request
1393 .locations
1394 .into_iter()
1395 .map(into_new_tool_call_location)
1396 .collect(),
1397 }
1398}
1399
1400fn acp_kind_from_old_icon(icon: acp_old::Icon) -> acp::ToolKind {
1401 match icon {
1402 acp_old::Icon::FileSearch => acp::ToolKind::Search,
1403 acp_old::Icon::Folder => acp::ToolKind::Search,
1404 acp_old::Icon::Globe => acp::ToolKind::Search,
1405 acp_old::Icon::Hammer => acp::ToolKind::Other,
1406 acp_old::Icon::LightBulb => acp::ToolKind::Think,
1407 acp_old::Icon::Pencil => acp::ToolKind::Edit,
1408 acp_old::Icon::Regex => acp::ToolKind::Search,
1409 acp_old::Icon::Terminal => acp::ToolKind::Execute,
1410 }
1411}
1412
1413fn into_new_tool_call_status(status: acp_old::ToolCallStatus) -> acp::ToolCallStatus {
1414 match status {
1415 acp_old::ToolCallStatus::Running => acp::ToolCallStatus::InProgress,
1416 acp_old::ToolCallStatus::Finished => acp::ToolCallStatus::Completed,
1417 acp_old::ToolCallStatus::Error => acp::ToolCallStatus::Failed,
1418 }
1419}
1420
1421fn into_new_tool_call_content(content: acp_old::ToolCallContent) -> acp::ToolCallContent {
1422 match content {
1423 acp_old::ToolCallContent::Markdown { markdown } => acp::ToolCallContent::ContentBlock {
1424 content: acp::ContentBlock::Text(acp::TextContent {
1425 annotations: None,
1426 text: markdown,
1427 }),
1428 },
1429 acp_old::ToolCallContent::Diff { diff } => acp::ToolCallContent::Diff {
1430 diff: into_new_diff(diff),
1431 },
1432 }
1433}
1434
1435fn into_new_diff(diff: acp_old::Diff) -> acp::Diff {
1436 acp::Diff {
1437 path: diff.path,
1438 old_text: diff.old_text,
1439 new_text: diff.new_text,
1440 }
1441}
1442
1443fn into_new_tool_call_location(location: acp_old::ToolCallLocation) -> acp::ToolCallLocation {
1444 acp::ToolCallLocation {
1445 path: location.path,
1446 line: location.line,
1447 }
1448}
1449
1450fn into_new_plan_entry(entry: acp_old::PlanEntry) -> acp::PlanEntry {
1451 acp::PlanEntry {
1452 content: entry.content,
1453 priority: into_new_plan_priority(entry.priority),
1454 status: into_new_plan_status(entry.status),
1455 }
1456}
1457
1458fn into_new_plan_priority(priority: acp_old::PlanEntryPriority) -> acp::PlanEntryPriority {
1459 match priority {
1460 acp_old::PlanEntryPriority::Low => acp::PlanEntryPriority::Low,
1461 acp_old::PlanEntryPriority::Medium => acp::PlanEntryPriority::Medium,
1462 acp_old::PlanEntryPriority::High => acp::PlanEntryPriority::High,
1463 }
1464}
1465
1466fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatus {
1467 match status {
1468 acp_old::PlanEntryStatus::Pending => acp::PlanEntryStatus::Pending,
1469 acp_old::PlanEntryStatus::InProgress => acp::PlanEntryStatus::InProgress,
1470 acp_old::PlanEntryStatus::Completed => acp::PlanEntryStatus::Completed,
1471 }
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476 use super::*;
1477 use anyhow::anyhow;
1478 use async_pipe::{PipeReader, PipeWriter};
1479 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1480 use gpui::{AsyncApp, TestAppContext};
1481 use indoc::indoc;
1482 use project::FakeFs;
1483 use serde_json::json;
1484 use settings::SettingsStore;
1485 use smol::{future::BoxedLocal, stream::StreamExt as _};
1486 use std::{cell::RefCell, rc::Rc, time::Duration};
1487 use util::path;
1488
1489 fn init_test(cx: &mut TestAppContext) {
1490 env_logger::try_init().ok();
1491 cx.update(|cx| {
1492 let settings_store = SettingsStore::test(cx);
1493 cx.set_global(settings_store);
1494 Project::init_settings(cx);
1495 language::init(cx);
1496 });
1497 }
1498
1499 #[gpui::test]
1500 async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1501 init_test(cx);
1502
1503 let fs = FakeFs::new(cx.executor());
1504 let project = Project::test(fs, [], cx).await;
1505 let (thread, fake_server) = fake_acp_thread(project, cx);
1506
1507 fake_server.update(cx, |fake_server, _| {
1508 fake_server.on_user_message(move |_, server, mut cx| async move {
1509 server
1510 .update(&mut cx, |server, _| {
1511 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1512 chunk: acp_old::AssistantMessageChunk::Thought {
1513 thought: "Thinking ".into(),
1514 },
1515 })
1516 })?
1517 .await
1518 .unwrap();
1519 server
1520 .update(&mut cx, |server, _| {
1521 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1522 chunk: acp_old::AssistantMessageChunk::Thought {
1523 thought: "hard!".into(),
1524 },
1525 })
1526 })?
1527 .await
1528 .unwrap();
1529
1530 Ok(())
1531 })
1532 });
1533
1534 thread
1535 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1536 .await
1537 .unwrap();
1538
1539 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1540 assert_eq!(
1541 output,
1542 indoc! {r#"
1543 ## User
1544
1545 Hello from Zed!
1546
1547 ## Assistant
1548
1549 <thinking>
1550 Thinking hard!
1551 </thinking>
1552
1553 "#}
1554 );
1555 }
1556
1557 #[gpui::test]
1558 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1559 init_test(cx);
1560
1561 let fs = FakeFs::new(cx.executor());
1562 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1563 .await;
1564 let project = Project::test(fs.clone(), [], cx).await;
1565 let (thread, fake_server) = fake_acp_thread(project.clone(), cx);
1566 let (worktree, pathbuf) = project
1567 .update(cx, |project, cx| {
1568 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1569 })
1570 .await
1571 .unwrap();
1572 let buffer = project
1573 .update(cx, |project, cx| {
1574 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1575 })
1576 .await
1577 .unwrap();
1578
1579 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1580 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1581
1582 fake_server.update(cx, |fake_server, _| {
1583 fake_server.on_user_message(move |_, server, mut cx| {
1584 let read_file_tx = read_file_tx.clone();
1585 async move {
1586 let content = server
1587 .update(&mut cx, |server, _| {
1588 server.send_to_zed(acp_old::ReadTextFileParams {
1589 path: path!("/tmp/foo").into(),
1590 line: None,
1591 limit: None,
1592 })
1593 })?
1594 .await
1595 .unwrap();
1596 assert_eq!(content.content, "one\ntwo\nthree\n");
1597 read_file_tx.take().unwrap().send(()).unwrap();
1598 server
1599 .update(&mut cx, |server, _| {
1600 server.send_to_zed(acp_old::WriteTextFileParams {
1601 path: path!("/tmp/foo").into(),
1602 content: "one\ntwo\nthree\nfour\nfive\n".to_string(),
1603 })
1604 })?
1605 .await
1606 .unwrap();
1607 Ok(())
1608 }
1609 })
1610 });
1611
1612 let request = thread.update(cx, |thread, cx| {
1613 thread.send_raw("Extend the count in /tmp/foo", cx)
1614 });
1615 read_file_rx.await.ok();
1616 buffer.update(cx, |buffer, cx| {
1617 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1618 });
1619 cx.run_until_parked();
1620 assert_eq!(
1621 buffer.read_with(cx, |buffer, _| buffer.text()),
1622 "zero\none\ntwo\nthree\nfour\nfive\n"
1623 );
1624 assert_eq!(
1625 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1626 "zero\none\ntwo\nthree\nfour\nfive\n"
1627 );
1628 request.await.unwrap();
1629 }
1630
1631 #[gpui::test]
1632 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1633 init_test(cx);
1634
1635 let fs = FakeFs::new(cx.executor());
1636 let project = Project::test(fs, [], cx).await;
1637 let (thread, fake_server) = fake_acp_thread(project, cx);
1638
1639 let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1640
1641 let tool_call_id = Rc::new(RefCell::new(None));
1642 let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1643 fake_server.update(cx, |fake_server, _| {
1644 let tool_call_id = tool_call_id.clone();
1645 fake_server.on_user_message(move |_, server, mut cx| {
1646 let end_turn_rx = end_turn_rx.clone();
1647 let tool_call_id = tool_call_id.clone();
1648 async move {
1649 let tool_call_result = server
1650 .update(&mut cx, |server, _| {
1651 server.send_to_zed(acp_old::PushToolCallParams {
1652 label: "Fetch".to_string(),
1653 icon: acp_old::Icon::Globe,
1654 content: None,
1655 locations: vec![],
1656 })
1657 })?
1658 .await
1659 .unwrap();
1660 *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1661 end_turn_rx.take().unwrap().await.ok();
1662
1663 Ok(())
1664 }
1665 })
1666 });
1667
1668 let request = thread.update(cx, |thread, cx| {
1669 thread.send_raw("Fetch https://example.com", cx)
1670 });
1671
1672 run_until_first_tool_call(&thread, cx).await;
1673
1674 thread.read_with(cx, |thread, _| {
1675 assert!(matches!(
1676 thread.entries[1],
1677 AgentThreadEntry::ToolCall(ToolCall {
1678 status: ToolCallStatus::Allowed {
1679 status: acp::ToolCallStatus::InProgress,
1680 ..
1681 },
1682 ..
1683 })
1684 ));
1685 });
1686
1687 cx.run_until_parked();
1688
1689 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1690
1691 thread.read_with(cx, |thread, _| {
1692 assert!(matches!(
1693 &thread.entries[1],
1694 AgentThreadEntry::ToolCall(ToolCall {
1695 status: ToolCallStatus::Canceled,
1696 ..
1697 })
1698 ));
1699 });
1700
1701 fake_server
1702 .update(cx, |fake_server, _| {
1703 fake_server.send_to_zed(acp_old::UpdateToolCallParams {
1704 tool_call_id: tool_call_id.borrow().unwrap(),
1705 status: acp_old::ToolCallStatus::Finished,
1706 content: None,
1707 })
1708 })
1709 .await
1710 .unwrap();
1711
1712 drop(end_turn_tx);
1713 assert!(request.await.unwrap_err().to_string().contains("canceled"));
1714
1715 thread.read_with(cx, |thread, _| {
1716 assert!(matches!(
1717 thread.entries[1],
1718 AgentThreadEntry::ToolCall(ToolCall {
1719 status: ToolCallStatus::Allowed {
1720 status: acp::ToolCallStatus::Completed,
1721 ..
1722 },
1723 ..
1724 })
1725 ));
1726 });
1727 }
1728
1729 async fn run_until_first_tool_call(
1730 thread: &Entity<AcpThread>,
1731 cx: &mut TestAppContext,
1732 ) -> usize {
1733 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1734
1735 let subscription = cx.update(|cx| {
1736 cx.subscribe(thread, move |thread, _, cx| {
1737 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1738 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1739 return tx.try_send(ix).unwrap();
1740 }
1741 }
1742 })
1743 });
1744
1745 select! {
1746 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1747 panic!("Timeout waiting for tool call")
1748 }
1749 ix = rx.next().fuse() => {
1750 drop(subscription);
1751 ix.unwrap()
1752 }
1753 }
1754 }
1755
1756 pub fn fake_acp_thread(
1757 project: Entity<Project>,
1758 cx: &mut TestAppContext,
1759 ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1760 let (stdin_tx, stdin_rx) = async_pipe::pipe();
1761 let (stdout_tx, stdout_rx) = async_pipe::pipe();
1762
1763 let thread = cx.new(|cx| {
1764 let foreground_executor = cx.foreground_executor().clone();
1765 let thread_rc = Rc::new(RefCell::new(cx.entity().downgrade()));
1766
1767 let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent(
1768 OldAcpClientDelegate::new(thread_rc.clone(), cx.to_async()),
1769 stdin_tx,
1770 stdout_rx,
1771 move |fut| {
1772 foreground_executor.spawn(fut).detach();
1773 },
1774 );
1775
1776 let io_task = cx.background_spawn({
1777 async move {
1778 io_fut.await.log_err();
1779 Ok(())
1780 }
1781 });
1782 let connection = OldAcpAgentConnection {
1783 name: "test",
1784 connection,
1785 child_status: io_task,
1786 };
1787
1788 AcpThread::new(
1789 Rc::new(connection),
1790 project,
1791 acp::SessionId("test".into()),
1792 cx,
1793 )
1794 });
1795 let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1796 (thread, agent)
1797 }
1798
1799 pub struct FakeAcpServer {
1800 connection: acp_old::ClientConnection,
1801
1802 _io_task: Task<()>,
1803 on_user_message: Option<
1804 Rc<
1805 dyn Fn(
1806 acp_old::SendUserMessageParams,
1807 Entity<FakeAcpServer>,
1808 AsyncApp,
1809 ) -> LocalBoxFuture<'static, Result<(), acp_old::Error>>,
1810 >,
1811 >,
1812 }
1813
1814 #[derive(Clone)]
1815 struct FakeAgent {
1816 server: Entity<FakeAcpServer>,
1817 cx: AsyncApp,
1818 cancel_tx: Rc<RefCell<Option<oneshot::Sender<()>>>>,
1819 }
1820
1821 impl acp_old::Agent for FakeAgent {
1822 async fn initialize(
1823 &self,
1824 params: acp_old::InitializeParams,
1825 ) -> Result<acp_old::InitializeResponse, acp_old::Error> {
1826 Ok(acp_old::InitializeResponse {
1827 protocol_version: params.protocol_version,
1828 is_authenticated: true,
1829 })
1830 }
1831
1832 async fn authenticate(&self) -> Result<(), acp_old::Error> {
1833 Ok(())
1834 }
1835
1836 async fn cancel_send_message(&self) -> Result<(), acp_old::Error> {
1837 if let Some(cancel_tx) = self.cancel_tx.take() {
1838 cancel_tx.send(()).log_err();
1839 }
1840 Ok(())
1841 }
1842
1843 async fn send_user_message(
1844 &self,
1845 request: acp_old::SendUserMessageParams,
1846 ) -> Result<(), acp_old::Error> {
1847 let (cancel_tx, cancel_rx) = oneshot::channel();
1848 self.cancel_tx.replace(Some(cancel_tx));
1849
1850 let mut cx = self.cx.clone();
1851 let handler = self
1852 .server
1853 .update(&mut cx, |server, _| server.on_user_message.clone())
1854 .ok()
1855 .flatten();
1856 if let Some(handler) = handler {
1857 select! {
1858 _ = cancel_rx.fuse() => Err(anyhow::anyhow!("Message sending canceled").into()),
1859 _ = handler(request, self.server.clone(), self.cx.clone()).fuse() => Ok(()),
1860 }
1861 } else {
1862 Err(anyhow::anyhow!("No handler for on_user_message").into())
1863 }
1864 }
1865 }
1866
1867 impl FakeAcpServer {
1868 fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1869 let agent = FakeAgent {
1870 server: cx.entity(),
1871 cx: cx.to_async(),
1872 cancel_tx: Default::default(),
1873 };
1874 let foreground_executor = cx.foreground_executor().clone();
1875
1876 let (connection, io_fut) = acp_old::ClientConnection::connect_to_client(
1877 agent.clone(),
1878 stdout,
1879 stdin,
1880 move |fut| {
1881 foreground_executor.spawn(fut).detach();
1882 },
1883 );
1884 FakeAcpServer {
1885 connection: connection,
1886 on_user_message: None,
1887 _io_task: cx.background_spawn(async move {
1888 io_fut.await.log_err();
1889 }),
1890 }
1891 }
1892
1893 fn on_user_message<F>(
1894 &mut self,
1895 handler: impl for<'a> Fn(
1896 acp_old::SendUserMessageParams,
1897 Entity<FakeAcpServer>,
1898 AsyncApp,
1899 ) -> F
1900 + 'static,
1901 ) where
1902 F: Future<Output = Result<(), acp_old::Error>> + 'static,
1903 {
1904 self.on_user_message
1905 .replace(Rc::new(move |request, server, cx| {
1906 handler(request, server, cx).boxed_local()
1907 }));
1908 }
1909
1910 fn send_to_zed<T: acp_old::ClientRequest + 'static>(
1911 &self,
1912 message: T,
1913 ) -> BoxedLocal<Result<T::Response>> {
1914 self.connection
1915 .request(message)
1916 .map(|f| f.map_err(|err| anyhow!(err)))
1917 .boxed_local()
1918 }
1919 }
1920}