1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6pub use connection::*;
7pub use diff::*;
8pub use mention::*;
9pub use terminal::*;
10
11use action_log::ActionLog;
12use agent_client_protocol as acp;
13use anyhow::{Context as _, Result, anyhow};
14use editor::Bias;
15use futures::{FutureExt, channel::oneshot, future::BoxFuture};
16use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
17use itertools::Itertools;
18use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
19use markdown::Markdown;
20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
21use std::collections::HashMap;
22use std::error::Error;
23use std::fmt::{Formatter, Write};
24use std::ops::Range;
25use std::process::ExitStatus;
26use std::rc::Rc;
27use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
28use ui::App;
29use util::ResultExt;
30
31#[derive(Debug)]
32pub struct UserMessage {
33 pub id: Option<UserMessageId>,
34 pub content: ContentBlock,
35 pub chunks: Vec<acp::ContentBlock>,
36 pub checkpoint: Option<Checkpoint>,
37}
38
39#[derive(Debug)]
40pub struct Checkpoint {
41 git_checkpoint: GitStoreCheckpoint,
42 pub show: bool,
43}
44
45impl UserMessage {
46 fn to_markdown(&self, cx: &App) -> String {
47 let mut markdown = String::new();
48 if self
49 .checkpoint
50 .as_ref()
51 .map_or(false, |checkpoint| checkpoint.show)
52 {
53 writeln!(markdown, "## User (checkpoint)").unwrap();
54 } else {
55 writeln!(markdown, "## User").unwrap();
56 }
57 writeln!(markdown).unwrap();
58 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
59 writeln!(markdown).unwrap();
60 markdown
61 }
62}
63
64#[derive(Debug, PartialEq)]
65pub struct AssistantMessage {
66 pub chunks: Vec<AssistantMessageChunk>,
67}
68
69impl AssistantMessage {
70 pub fn to_markdown(&self, cx: &App) -> String {
71 format!(
72 "## Assistant\n\n{}\n\n",
73 self.chunks
74 .iter()
75 .map(|chunk| chunk.to_markdown(cx))
76 .join("\n\n")
77 )
78 }
79}
80
81#[derive(Debug, PartialEq)]
82pub enum AssistantMessageChunk {
83 Message { block: ContentBlock },
84 Thought { block: ContentBlock },
85}
86
87impl AssistantMessageChunk {
88 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
89 Self::Message {
90 block: ContentBlock::new(chunk.into(), language_registry, cx),
91 }
92 }
93
94 fn to_markdown(&self, cx: &App) -> String {
95 match self {
96 Self::Message { block } => block.to_markdown(cx).to_string(),
97 Self::Thought { block } => {
98 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
99 }
100 }
101 }
102}
103
104#[derive(Debug)]
105pub enum AgentThreadEntry {
106 UserMessage(UserMessage),
107 AssistantMessage(AssistantMessage),
108 ToolCall(ToolCall),
109}
110
111impl AgentThreadEntry {
112 fn to_markdown(&self, cx: &App) -> String {
113 match self {
114 Self::UserMessage(message) => message.to_markdown(cx),
115 Self::AssistantMessage(message) => message.to_markdown(cx),
116 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
117 }
118 }
119
120 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
121 if let AgentThreadEntry::ToolCall(call) = self {
122 itertools::Either::Left(call.diffs())
123 } else {
124 itertools::Either::Right(std::iter::empty())
125 }
126 }
127
128 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
129 if let AgentThreadEntry::ToolCall(call) = self {
130 itertools::Either::Left(call.terminals())
131 } else {
132 itertools::Either::Right(std::iter::empty())
133 }
134 }
135
136 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
137 if let AgentThreadEntry::ToolCall(ToolCall {
138 locations,
139 resolved_locations,
140 ..
141 }) = self
142 {
143 Some((
144 locations.get(ix)?.clone(),
145 resolved_locations.get(ix)?.clone()?,
146 ))
147 } else {
148 None
149 }
150 }
151}
152
153#[derive(Debug)]
154pub struct ToolCall {
155 pub id: acp::ToolCallId,
156 pub label: Entity<Markdown>,
157 pub kind: acp::ToolKind,
158 pub content: Vec<ToolCallContent>,
159 pub status: ToolCallStatus,
160 pub locations: Vec<acp::ToolCallLocation>,
161 pub resolved_locations: Vec<Option<AgentLocation>>,
162 pub raw_input: Option<serde_json::Value>,
163 pub raw_output: Option<serde_json::Value>,
164}
165
166impl ToolCall {
167 fn from_acp(
168 tool_call: acp::ToolCall,
169 status: ToolCallStatus,
170 language_registry: Arc<LanguageRegistry>,
171 cx: &mut App,
172 ) -> Self {
173 Self {
174 id: tool_call.id,
175 label: cx.new(|cx| {
176 Markdown::new(
177 tool_call.title.into(),
178 Some(language_registry.clone()),
179 None,
180 cx,
181 )
182 }),
183 kind: tool_call.kind,
184 content: tool_call
185 .content
186 .into_iter()
187 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
188 .collect(),
189 locations: tool_call.locations,
190 resolved_locations: Vec::default(),
191 status,
192 raw_input: tool_call.raw_input,
193 raw_output: tool_call.raw_output,
194 }
195 }
196
197 fn update_fields(
198 &mut self,
199 fields: acp::ToolCallUpdateFields,
200 language_registry: Arc<LanguageRegistry>,
201 cx: &mut App,
202 ) {
203 let acp::ToolCallUpdateFields {
204 kind,
205 status,
206 title,
207 content,
208 locations,
209 raw_input,
210 raw_output,
211 } = fields;
212
213 if let Some(kind) = kind {
214 self.kind = kind;
215 }
216
217 if let Some(status) = status {
218 self.status = ToolCallStatus::Allowed { status };
219 }
220
221 if let Some(title) = title {
222 self.label.update(cx, |label, cx| {
223 label.replace(title, cx);
224 });
225 }
226
227 if let Some(content) = content {
228 self.content = content
229 .into_iter()
230 .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
231 .collect();
232 }
233
234 if let Some(locations) = locations {
235 self.locations = locations;
236 }
237
238 if let Some(raw_input) = raw_input {
239 self.raw_input = Some(raw_input);
240 }
241
242 if let Some(raw_output) = raw_output {
243 if self.content.is_empty() {
244 if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
245 {
246 self.content
247 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
248 markdown,
249 }));
250 }
251 }
252 self.raw_output = Some(raw_output);
253 }
254 }
255
256 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
257 self.content.iter().filter_map(|content| match content {
258 ToolCallContent::Diff(diff) => Some(diff),
259 ToolCallContent::ContentBlock(_) => None,
260 ToolCallContent::Terminal(_) => None,
261 })
262 }
263
264 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
265 self.content.iter().filter_map(|content| match content {
266 ToolCallContent::Terminal(terminal) => Some(terminal),
267 ToolCallContent::ContentBlock(_) => None,
268 ToolCallContent::Diff(_) => None,
269 })
270 }
271
272 fn to_markdown(&self, cx: &App) -> String {
273 let mut markdown = format!(
274 "**Tool Call: {}**\nStatus: {}\n\n",
275 self.label.read(cx).source(),
276 self.status
277 );
278 for content in &self.content {
279 markdown.push_str(content.to_markdown(cx).as_str());
280 markdown.push_str("\n\n");
281 }
282 markdown
283 }
284
285 async fn resolve_location(
286 location: acp::ToolCallLocation,
287 project: WeakEntity<Project>,
288 cx: &mut AsyncApp,
289 ) -> Option<AgentLocation> {
290 let buffer = project
291 .update(cx, |project, cx| {
292 if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
293 Some(project.open_buffer(path, cx))
294 } else {
295 None
296 }
297 })
298 .ok()??;
299 let buffer = buffer.await.log_err()?;
300 let position = buffer
301 .update(cx, |buffer, _| {
302 if let Some(row) = location.line {
303 let snapshot = buffer.snapshot();
304 let column = snapshot.indent_size_for_line(row).len;
305 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
306 snapshot.anchor_before(point)
307 } else {
308 Anchor::MIN
309 }
310 })
311 .ok()?;
312
313 Some(AgentLocation {
314 buffer: buffer.downgrade(),
315 position,
316 })
317 }
318
319 fn resolve_locations(
320 &self,
321 project: Entity<Project>,
322 cx: &mut App,
323 ) -> Task<Vec<Option<AgentLocation>>> {
324 let locations = self.locations.clone();
325 project.update(cx, |_, cx| {
326 cx.spawn(async move |project, cx| {
327 let mut new_locations = Vec::new();
328 for location in locations {
329 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
330 }
331 new_locations
332 })
333 })
334 }
335}
336
337#[derive(Debug)]
338pub enum ToolCallStatus {
339 WaitingForConfirmation {
340 options: Vec<acp::PermissionOption>,
341 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
342 },
343 Allowed {
344 status: acp::ToolCallStatus,
345 },
346 Rejected,
347 Canceled,
348}
349
350impl Display for ToolCallStatus {
351 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
352 write!(
353 f,
354 "{}",
355 match self {
356 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
357 ToolCallStatus::Allowed { status } => match status {
358 acp::ToolCallStatus::Pending => "Pending",
359 acp::ToolCallStatus::InProgress => "In Progress",
360 acp::ToolCallStatus::Completed => "Completed",
361 acp::ToolCallStatus::Failed => "Failed",
362 },
363 ToolCallStatus::Rejected => "Rejected",
364 ToolCallStatus::Canceled => "Canceled",
365 }
366 )
367 }
368}
369
370#[derive(Debug, PartialEq, Clone)]
371pub enum ContentBlock {
372 Empty,
373 Markdown { markdown: Entity<Markdown> },
374 ResourceLink { resource_link: acp::ResourceLink },
375}
376
377impl ContentBlock {
378 pub fn new(
379 block: acp::ContentBlock,
380 language_registry: &Arc<LanguageRegistry>,
381 cx: &mut App,
382 ) -> Self {
383 let mut this = Self::Empty;
384 this.append(block, language_registry, cx);
385 this
386 }
387
388 pub fn new_combined(
389 blocks: impl IntoIterator<Item = acp::ContentBlock>,
390 language_registry: Arc<LanguageRegistry>,
391 cx: &mut App,
392 ) -> Self {
393 let mut this = Self::Empty;
394 for block in blocks {
395 this.append(block, &language_registry, cx);
396 }
397 this
398 }
399
400 pub fn append(
401 &mut self,
402 block: acp::ContentBlock,
403 language_registry: &Arc<LanguageRegistry>,
404 cx: &mut App,
405 ) {
406 if matches!(self, ContentBlock::Empty) {
407 if let acp::ContentBlock::ResourceLink(resource_link) = block {
408 *self = ContentBlock::ResourceLink { resource_link };
409 return;
410 }
411 }
412
413 let new_content = self.block_string_contents(block);
414
415 match self {
416 ContentBlock::Empty => {
417 *self = Self::create_markdown_block(new_content, language_registry, cx);
418 }
419 ContentBlock::Markdown { markdown } => {
420 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
421 }
422 ContentBlock::ResourceLink { resource_link } => {
423 let existing_content = Self::resource_link_md(&resource_link.uri);
424 let combined = format!("{}\n{}", existing_content, new_content);
425
426 *self = Self::create_markdown_block(combined, language_registry, cx);
427 }
428 }
429 }
430
431 fn create_markdown_block(
432 content: String,
433 language_registry: &Arc<LanguageRegistry>,
434 cx: &mut App,
435 ) -> ContentBlock {
436 ContentBlock::Markdown {
437 markdown: cx
438 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
439 }
440 }
441
442 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
443 match block {
444 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
445 acp::ContentBlock::ResourceLink(resource_link) => {
446 Self::resource_link_md(&resource_link.uri)
447 }
448 acp::ContentBlock::Resource(acp::EmbeddedResource {
449 resource:
450 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
451 uri,
452 ..
453 }),
454 ..
455 }) => Self::resource_link_md(&uri),
456 acp::ContentBlock::Image(image) => Self::image_md(&image),
457 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
458 }
459 }
460
461 fn resource_link_md(uri: &str) -> String {
462 if let Some(uri) = MentionUri::parse(&uri).log_err() {
463 uri.as_link().to_string()
464 } else {
465 uri.to_string()
466 }
467 }
468
469 fn image_md(_image: &acp::ImageContent) -> String {
470 "`Image`".into()
471 }
472
473 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
474 match self {
475 ContentBlock::Empty => "",
476 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
477 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
478 }
479 }
480
481 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
482 match self {
483 ContentBlock::Empty => None,
484 ContentBlock::Markdown { markdown } => Some(markdown),
485 ContentBlock::ResourceLink { .. } => None,
486 }
487 }
488
489 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
490 match self {
491 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
492 _ => None,
493 }
494 }
495}
496
497#[derive(Debug)]
498pub enum ToolCallContent {
499 ContentBlock(ContentBlock),
500 Diff(Entity<Diff>),
501 Terminal(Entity<Terminal>),
502}
503
504impl ToolCallContent {
505 pub fn from_acp(
506 content: acp::ToolCallContent,
507 language_registry: Arc<LanguageRegistry>,
508 cx: &mut App,
509 ) -> Self {
510 match content {
511 acp::ToolCallContent::Content { content } => {
512 Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
513 }
514 acp::ToolCallContent::Diff { diff } => {
515 Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
516 }
517 }
518 }
519
520 pub fn to_markdown(&self, cx: &App) -> String {
521 match self {
522 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
523 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
524 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
525 }
526 }
527}
528
529#[derive(Debug, PartialEq)]
530pub enum ToolCallUpdate {
531 UpdateFields(acp::ToolCallUpdate),
532 UpdateDiff(ToolCallUpdateDiff),
533 UpdateTerminal(ToolCallUpdateTerminal),
534}
535
536impl ToolCallUpdate {
537 fn id(&self) -> &acp::ToolCallId {
538 match self {
539 Self::UpdateFields(update) => &update.id,
540 Self::UpdateDiff(diff) => &diff.id,
541 Self::UpdateTerminal(terminal) => &terminal.id,
542 }
543 }
544}
545
546impl From<acp::ToolCallUpdate> for ToolCallUpdate {
547 fn from(update: acp::ToolCallUpdate) -> Self {
548 Self::UpdateFields(update)
549 }
550}
551
552impl From<ToolCallUpdateDiff> for ToolCallUpdate {
553 fn from(diff: ToolCallUpdateDiff) -> Self {
554 Self::UpdateDiff(diff)
555 }
556}
557
558#[derive(Debug, PartialEq)]
559pub struct ToolCallUpdateDiff {
560 pub id: acp::ToolCallId,
561 pub diff: Entity<Diff>,
562}
563
564impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
565 fn from(terminal: ToolCallUpdateTerminal) -> Self {
566 Self::UpdateTerminal(terminal)
567 }
568}
569
570#[derive(Debug, PartialEq)]
571pub struct ToolCallUpdateTerminal {
572 pub id: acp::ToolCallId,
573 pub terminal: Entity<Terminal>,
574}
575
576#[derive(Debug, Default)]
577pub struct Plan {
578 pub entries: Vec<PlanEntry>,
579}
580
581#[derive(Debug)]
582pub struct PlanStats<'a> {
583 pub in_progress_entry: Option<&'a PlanEntry>,
584 pub pending: u32,
585 pub completed: u32,
586}
587
588impl Plan {
589 pub fn is_empty(&self) -> bool {
590 self.entries.is_empty()
591 }
592
593 pub fn stats(&self) -> PlanStats<'_> {
594 let mut stats = PlanStats {
595 in_progress_entry: None,
596 pending: 0,
597 completed: 0,
598 };
599
600 for entry in &self.entries {
601 match &entry.status {
602 acp::PlanEntryStatus::Pending => {
603 stats.pending += 1;
604 }
605 acp::PlanEntryStatus::InProgress => {
606 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
607 }
608 acp::PlanEntryStatus::Completed => {
609 stats.completed += 1;
610 }
611 }
612 }
613
614 stats
615 }
616}
617
618#[derive(Debug)]
619pub struct PlanEntry {
620 pub content: Entity<Markdown>,
621 pub priority: acp::PlanEntryPriority,
622 pub status: acp::PlanEntryStatus,
623}
624
625impl PlanEntry {
626 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
627 Self {
628 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
629 priority: entry.priority,
630 status: entry.status,
631 }
632 }
633}
634
635pub struct AcpThread {
636 title: SharedString,
637 entries: Vec<AgentThreadEntry>,
638 plan: Plan,
639 project: Entity<Project>,
640 action_log: Entity<ActionLog>,
641 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
642 send_task: Option<Task<()>>,
643 connection: Rc<dyn AgentConnection>,
644 session_id: acp::SessionId,
645}
646
647pub enum AcpThreadEvent {
648 NewEntry,
649 EntryUpdated(usize),
650 EntriesRemoved(Range<usize>),
651 ToolAuthorizationRequired,
652 Stopped,
653 Error,
654 ServerExited(ExitStatus),
655}
656
657impl EventEmitter<AcpThreadEvent> for AcpThread {}
658
659#[derive(PartialEq, Eq)]
660pub enum ThreadStatus {
661 Idle,
662 WaitingForToolConfirmation,
663 Generating,
664}
665
666#[derive(Debug, Clone)]
667pub enum LoadError {
668 Unsupported {
669 error_message: SharedString,
670 upgrade_message: SharedString,
671 upgrade_command: String,
672 },
673 Exited(i32),
674 Other(SharedString),
675}
676
677impl Display for LoadError {
678 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
679 match self {
680 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
681 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
682 LoadError::Other(msg) => write!(f, "{}", msg),
683 }
684 }
685}
686
687impl Error for LoadError {}
688
689impl AcpThread {
690 pub fn new(
691 title: impl Into<SharedString>,
692 connection: Rc<dyn AgentConnection>,
693 project: Entity<Project>,
694 session_id: acp::SessionId,
695 cx: &mut Context<Self>,
696 ) -> Self {
697 let action_log = cx.new(|_| ActionLog::new(project.clone()));
698
699 Self {
700 action_log,
701 shared_buffers: Default::default(),
702 entries: Default::default(),
703 plan: Default::default(),
704 title: title.into(),
705 project,
706 send_task: None,
707 connection,
708 session_id,
709 }
710 }
711
712 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
713 &self.connection
714 }
715
716 pub fn action_log(&self) -> &Entity<ActionLog> {
717 &self.action_log
718 }
719
720 pub fn project(&self) -> &Entity<Project> {
721 &self.project
722 }
723
724 pub fn title(&self) -> SharedString {
725 self.title.clone()
726 }
727
728 pub fn entries(&self) -> &[AgentThreadEntry] {
729 &self.entries
730 }
731
732 pub fn session_id(&self) -> &acp::SessionId {
733 &self.session_id
734 }
735
736 pub fn status(&self) -> ThreadStatus {
737 if self.send_task.is_some() {
738 if self.waiting_for_tool_confirmation() {
739 ThreadStatus::WaitingForToolConfirmation
740 } else {
741 ThreadStatus::Generating
742 }
743 } else {
744 ThreadStatus::Idle
745 }
746 }
747
748 pub fn has_pending_edit_tool_calls(&self) -> bool {
749 for entry in self.entries.iter().rev() {
750 match entry {
751 AgentThreadEntry::UserMessage(_) => return false,
752 AgentThreadEntry::ToolCall(
753 call @ ToolCall {
754 status:
755 ToolCallStatus::Allowed {
756 status:
757 acp::ToolCallStatus::InProgress | acp::ToolCallStatus::Pending,
758 },
759 ..
760 },
761 ) if call.diffs().next().is_some() => {
762 return true;
763 }
764 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
765 }
766 }
767
768 false
769 }
770
771 pub fn used_tools_since_last_user_message(&self) -> bool {
772 for entry in self.entries.iter().rev() {
773 match entry {
774 AgentThreadEntry::UserMessage(..) => return false,
775 AgentThreadEntry::AssistantMessage(..) => continue,
776 AgentThreadEntry::ToolCall(..) => return true,
777 }
778 }
779
780 false
781 }
782
783 pub fn handle_session_update(
784 &mut self,
785 update: acp::SessionUpdate,
786 cx: &mut Context<Self>,
787 ) -> Result<()> {
788 match update {
789 acp::SessionUpdate::UserMessageChunk { content } => {
790 self.push_user_content_block(None, content, cx);
791 }
792 acp::SessionUpdate::AgentMessageChunk { content } => {
793 self.push_assistant_content_block(content, false, cx);
794 }
795 acp::SessionUpdate::AgentThoughtChunk { content } => {
796 self.push_assistant_content_block(content, true, cx);
797 }
798 acp::SessionUpdate::ToolCall(tool_call) => {
799 self.upsert_tool_call(tool_call, cx);
800 }
801 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
802 self.update_tool_call(tool_call_update, cx)?;
803 }
804 acp::SessionUpdate::Plan(plan) => {
805 self.update_plan(plan, cx);
806 }
807 }
808 Ok(())
809 }
810
811 pub fn push_user_content_block(
812 &mut self,
813 message_id: Option<UserMessageId>,
814 chunk: acp::ContentBlock,
815 cx: &mut Context<Self>,
816 ) {
817 let language_registry = self.project.read(cx).languages().clone();
818 let entries_len = self.entries.len();
819
820 if let Some(last_entry) = self.entries.last_mut()
821 && let AgentThreadEntry::UserMessage(UserMessage {
822 id,
823 content,
824 chunks,
825 ..
826 }) = last_entry
827 {
828 *id = message_id.or(id.take());
829 content.append(chunk.clone(), &language_registry, cx);
830 chunks.push(chunk);
831 let idx = entries_len - 1;
832 cx.emit(AcpThreadEvent::EntryUpdated(idx));
833 } else {
834 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
835 self.push_entry(
836 AgentThreadEntry::UserMessage(UserMessage {
837 id: message_id,
838 content,
839 chunks: vec![chunk],
840 checkpoint: None,
841 }),
842 cx,
843 );
844 }
845 }
846
847 pub fn push_assistant_content_block(
848 &mut self,
849 chunk: acp::ContentBlock,
850 is_thought: bool,
851 cx: &mut Context<Self>,
852 ) {
853 let language_registry = self.project.read(cx).languages().clone();
854 let entries_len = self.entries.len();
855 if let Some(last_entry) = self.entries.last_mut()
856 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
857 {
858 let idx = entries_len - 1;
859 cx.emit(AcpThreadEvent::EntryUpdated(idx));
860 match (chunks.last_mut(), is_thought) {
861 (Some(AssistantMessageChunk::Message { block }), false)
862 | (Some(AssistantMessageChunk::Thought { block }), true) => {
863 block.append(chunk, &language_registry, cx)
864 }
865 _ => {
866 let block = ContentBlock::new(chunk, &language_registry, cx);
867 if is_thought {
868 chunks.push(AssistantMessageChunk::Thought { block })
869 } else {
870 chunks.push(AssistantMessageChunk::Message { block })
871 }
872 }
873 }
874 } else {
875 let block = ContentBlock::new(chunk, &language_registry, cx);
876 let chunk = if is_thought {
877 AssistantMessageChunk::Thought { block }
878 } else {
879 AssistantMessageChunk::Message { block }
880 };
881
882 self.push_entry(
883 AgentThreadEntry::AssistantMessage(AssistantMessage {
884 chunks: vec![chunk],
885 }),
886 cx,
887 );
888 }
889 }
890
891 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
892 self.entries.push(entry);
893 cx.emit(AcpThreadEvent::NewEntry);
894 }
895
896 pub fn update_tool_call(
897 &mut self,
898 update: impl Into<ToolCallUpdate>,
899 cx: &mut Context<Self>,
900 ) -> Result<()> {
901 let update = update.into();
902 let languages = self.project.read(cx).languages().clone();
903
904 let (ix, current_call) = self
905 .tool_call_mut(update.id())
906 .context("Tool call not found")?;
907 match update {
908 ToolCallUpdate::UpdateFields(update) => {
909 let location_updated = update.fields.locations.is_some();
910 current_call.update_fields(update.fields, languages, cx);
911 if location_updated {
912 self.resolve_locations(update.id.clone(), cx);
913 }
914 }
915 ToolCallUpdate::UpdateDiff(update) => {
916 current_call.content.clear();
917 current_call
918 .content
919 .push(ToolCallContent::Diff(update.diff));
920 }
921 ToolCallUpdate::UpdateTerminal(update) => {
922 current_call.content.clear();
923 current_call
924 .content
925 .push(ToolCallContent::Terminal(update.terminal));
926 }
927 }
928
929 cx.emit(AcpThreadEvent::EntryUpdated(ix));
930
931 Ok(())
932 }
933
934 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
935 pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
936 let status = ToolCallStatus::Allowed {
937 status: tool_call.status,
938 };
939 self.upsert_tool_call_inner(tool_call, status, cx)
940 }
941
942 pub fn upsert_tool_call_inner(
943 &mut self,
944 tool_call: acp::ToolCall,
945 status: ToolCallStatus,
946 cx: &mut Context<Self>,
947 ) {
948 let language_registry = self.project.read(cx).languages().clone();
949 let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
950 let id = call.id.clone();
951
952 if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
953 *current_call = call;
954
955 cx.emit(AcpThreadEvent::EntryUpdated(ix));
956 } else {
957 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
958 };
959
960 self.resolve_locations(id, cx);
961 }
962
963 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
964 // The tool call we are looking for is typically the last one, or very close to the end.
965 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
966 self.entries
967 .iter_mut()
968 .enumerate()
969 .rev()
970 .find_map(|(index, tool_call)| {
971 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
972 && &tool_call.id == id
973 {
974 Some((index, tool_call))
975 } else {
976 None
977 }
978 })
979 }
980
981 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
982 let project = self.project.clone();
983 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
984 return;
985 };
986 let task = tool_call.resolve_locations(project, cx);
987 cx.spawn(async move |this, cx| {
988 let resolved_locations = task.await;
989 this.update(cx, |this, cx| {
990 let project = this.project.clone();
991 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
992 return;
993 };
994 if let Some(Some(location)) = resolved_locations.last() {
995 project.update(cx, |project, cx| {
996 if let Some(agent_location) = project.agent_location() {
997 let should_ignore = agent_location.buffer == location.buffer
998 && location
999 .buffer
1000 .update(cx, |buffer, _| {
1001 let snapshot = buffer.snapshot();
1002 let old_position =
1003 agent_location.position.to_point(&snapshot);
1004 let new_position = location.position.to_point(&snapshot);
1005 // ignore this so that when we get updates from the edit tool
1006 // the position doesn't reset to the startof line
1007 old_position.row == new_position.row
1008 && old_position.column > new_position.column
1009 })
1010 .ok()
1011 .unwrap_or_default();
1012 if !should_ignore {
1013 project.set_agent_location(Some(location.clone()), cx);
1014 }
1015 }
1016 });
1017 }
1018 if tool_call.resolved_locations != resolved_locations {
1019 tool_call.resolved_locations = resolved_locations;
1020 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1021 }
1022 })
1023 })
1024 .detach();
1025 }
1026
1027 pub fn request_tool_call_authorization(
1028 &mut self,
1029 tool_call: acp::ToolCall,
1030 options: Vec<acp::PermissionOption>,
1031 cx: &mut Context<Self>,
1032 ) -> oneshot::Receiver<acp::PermissionOptionId> {
1033 let (tx, rx) = oneshot::channel();
1034
1035 let status = ToolCallStatus::WaitingForConfirmation {
1036 options,
1037 respond_tx: tx,
1038 };
1039
1040 self.upsert_tool_call_inner(tool_call, status, cx);
1041 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1042 rx
1043 }
1044
1045 pub fn authorize_tool_call(
1046 &mut self,
1047 id: acp::ToolCallId,
1048 option_id: acp::PermissionOptionId,
1049 option_kind: acp::PermissionOptionKind,
1050 cx: &mut Context<Self>,
1051 ) {
1052 let Some((ix, call)) = self.tool_call_mut(&id) else {
1053 return;
1054 };
1055
1056 let new_status = match option_kind {
1057 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1058 ToolCallStatus::Rejected
1059 }
1060 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1061 ToolCallStatus::Allowed {
1062 status: acp::ToolCallStatus::InProgress,
1063 }
1064 }
1065 };
1066
1067 let curr_status = mem::replace(&mut call.status, new_status);
1068
1069 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1070 respond_tx.send(option_id).log_err();
1071 } else if cfg!(debug_assertions) {
1072 panic!("tried to authorize an already authorized tool call");
1073 }
1074
1075 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1076 }
1077
1078 /// Returns true if the last turn is awaiting tool authorization
1079 pub fn waiting_for_tool_confirmation(&self) -> bool {
1080 for entry in self.entries.iter().rev() {
1081 match &entry {
1082 AgentThreadEntry::ToolCall(call) => match call.status {
1083 ToolCallStatus::WaitingForConfirmation { .. } => return true,
1084 ToolCallStatus::Allowed { .. }
1085 | ToolCallStatus::Rejected
1086 | ToolCallStatus::Canceled => continue,
1087 },
1088 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1089 // Reached the beginning of the turn
1090 return false;
1091 }
1092 }
1093 }
1094 false
1095 }
1096
1097 pub fn plan(&self) -> &Plan {
1098 &self.plan
1099 }
1100
1101 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1102 let new_entries_len = request.entries.len();
1103 let mut new_entries = request.entries.into_iter();
1104
1105 // Reuse existing markdown to prevent flickering
1106 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1107 let PlanEntry {
1108 content,
1109 priority,
1110 status,
1111 } = old;
1112 content.update(cx, |old, cx| {
1113 old.replace(new.content, cx);
1114 });
1115 *priority = new.priority;
1116 *status = new.status;
1117 }
1118 for new in new_entries {
1119 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1120 }
1121 self.plan.entries.truncate(new_entries_len);
1122
1123 cx.notify();
1124 }
1125
1126 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1127 self.plan
1128 .entries
1129 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1130 cx.notify();
1131 }
1132
1133 #[cfg(any(test, feature = "test-support"))]
1134 pub fn send_raw(
1135 &mut self,
1136 message: &str,
1137 cx: &mut Context<Self>,
1138 ) -> BoxFuture<'static, Result<()>> {
1139 self.send(
1140 vec![acp::ContentBlock::Text(acp::TextContent {
1141 text: message.to_string(),
1142 annotations: None,
1143 })],
1144 cx,
1145 )
1146 }
1147
1148 pub fn send(
1149 &mut self,
1150 message: Vec<acp::ContentBlock>,
1151 cx: &mut Context<Self>,
1152 ) -> BoxFuture<'static, Result<()>> {
1153 let block = ContentBlock::new_combined(
1154 message.clone(),
1155 self.project.read(cx).languages().clone(),
1156 cx,
1157 );
1158 let request = acp::PromptRequest {
1159 prompt: message.clone(),
1160 session_id: self.session_id.clone(),
1161 };
1162 let git_store = self.project.read(cx).git_store().clone();
1163
1164 let message_id = if self
1165 .connection
1166 .session_editor(&self.session_id, cx)
1167 .is_some()
1168 {
1169 Some(UserMessageId::new())
1170 } else {
1171 None
1172 };
1173 self.push_entry(
1174 AgentThreadEntry::UserMessage(UserMessage {
1175 id: message_id.clone(),
1176 content: block,
1177 chunks: message,
1178 checkpoint: None,
1179 }),
1180 cx,
1181 );
1182
1183 self.run_turn(cx, async move |this, cx| {
1184 let old_checkpoint = git_store
1185 .update(cx, |git, cx| git.checkpoint(cx))?
1186 .await
1187 .context("failed to get old checkpoint")
1188 .log_err();
1189 this.update(cx, |this, cx| {
1190 if let Some((_ix, message)) = this.last_user_message() {
1191 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1192 git_checkpoint,
1193 show: false,
1194 });
1195 }
1196 this.connection.prompt(message_id, request, cx)
1197 })?
1198 .await
1199 })
1200 }
1201
1202 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1203 self.run_turn(cx, async move |this, cx| {
1204 this.update(cx, |this, cx| {
1205 this.connection
1206 .resume(&this.session_id, cx)
1207 .map(|resume| resume.run(cx))
1208 })?
1209 .context("resuming a session is not supported")?
1210 .await
1211 })
1212 }
1213
1214 fn run_turn(
1215 &mut self,
1216 cx: &mut Context<Self>,
1217 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1218 ) -> BoxFuture<'static, Result<()>> {
1219 self.clear_completed_plan_entries(cx);
1220
1221 let (tx, rx) = oneshot::channel();
1222 let cancel_task = self.cancel(cx);
1223
1224 self.send_task = Some(cx.spawn(async move |this, cx| {
1225 cancel_task.await;
1226 tx.send(f(this, cx).await).ok();
1227 }));
1228
1229 cx.spawn(async move |this, cx| {
1230 let response = rx.await;
1231
1232 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1233 .await?;
1234
1235 this.update(cx, |this, cx| {
1236 match response {
1237 Ok(Err(e)) => {
1238 this.send_task.take();
1239 cx.emit(AcpThreadEvent::Error);
1240 Err(e)
1241 }
1242 result => {
1243 let cancelled = matches!(
1244 result,
1245 Ok(Ok(acp::PromptResponse {
1246 stop_reason: acp::StopReason::Cancelled
1247 }))
1248 );
1249
1250 // We only take the task if the current prompt wasn't cancelled.
1251 //
1252 // This prompt may have been cancelled because another one was sent
1253 // while it was still generating. In these cases, dropping `send_task`
1254 // would cause the next generation to be cancelled.
1255 if !cancelled {
1256 this.send_task.take();
1257 }
1258
1259 cx.emit(AcpThreadEvent::Stopped);
1260 Ok(())
1261 }
1262 }
1263 })?
1264 })
1265 .boxed()
1266 }
1267
1268 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1269 let Some(send_task) = self.send_task.take() else {
1270 return Task::ready(());
1271 };
1272
1273 for entry in self.entries.iter_mut() {
1274 if let AgentThreadEntry::ToolCall(call) = entry {
1275 let cancel = matches!(
1276 call.status,
1277 ToolCallStatus::WaitingForConfirmation { .. }
1278 | ToolCallStatus::Allowed {
1279 status: acp::ToolCallStatus::InProgress
1280 }
1281 );
1282
1283 if cancel {
1284 call.status = ToolCallStatus::Canceled;
1285 }
1286 }
1287 }
1288
1289 self.connection.cancel(&self.session_id, cx);
1290
1291 // Wait for the send task to complete
1292 cx.foreground_executor().spawn(send_task)
1293 }
1294
1295 /// Rewinds this thread to before the entry at `index`, removing it and all
1296 /// subsequent entries while reverting any changes made from that point.
1297 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1298 let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1299 return Task::ready(Err(anyhow!("not supported")));
1300 };
1301 let Some(message) = self.user_message(&id) else {
1302 return Task::ready(Err(anyhow!("message not found")));
1303 };
1304
1305 let checkpoint = message
1306 .checkpoint
1307 .as_ref()
1308 .map(|c| c.git_checkpoint.clone());
1309
1310 let git_store = self.project.read(cx).git_store().clone();
1311 cx.spawn(async move |this, cx| {
1312 if let Some(checkpoint) = checkpoint {
1313 git_store
1314 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1315 .await?;
1316 }
1317
1318 cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1319 .await?;
1320 this.update(cx, |this, cx| {
1321 if let Some((ix, _)) = this.user_message_mut(&id) {
1322 let range = ix..this.entries.len();
1323 this.entries.truncate(ix);
1324 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1325 }
1326 })
1327 })
1328 }
1329
1330 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1331 let git_store = self.project.read(cx).git_store().clone();
1332
1333 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1334 if let Some(checkpoint) = message.checkpoint.as_ref() {
1335 checkpoint.git_checkpoint.clone()
1336 } else {
1337 return Task::ready(Ok(()));
1338 }
1339 } else {
1340 return Task::ready(Ok(()));
1341 };
1342
1343 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1344 cx.spawn(async move |this, cx| {
1345 let new_checkpoint = new_checkpoint
1346 .await
1347 .context("failed to get new checkpoint")
1348 .log_err();
1349 if let Some(new_checkpoint) = new_checkpoint {
1350 let equal = git_store
1351 .update(cx, |git, cx| {
1352 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1353 })?
1354 .await
1355 .unwrap_or(true);
1356 this.update(cx, |this, cx| {
1357 let (ix, message) = this.last_user_message().context("no user message")?;
1358 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1359 checkpoint.show = !equal;
1360 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1361 anyhow::Ok(())
1362 })??;
1363 }
1364
1365 Ok(())
1366 })
1367 }
1368
1369 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1370 self.entries
1371 .iter_mut()
1372 .enumerate()
1373 .rev()
1374 .find_map(|(ix, entry)| {
1375 if let AgentThreadEntry::UserMessage(message) = entry {
1376 Some((ix, message))
1377 } else {
1378 None
1379 }
1380 })
1381 }
1382
1383 fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1384 self.entries.iter().find_map(|entry| {
1385 if let AgentThreadEntry::UserMessage(message) = entry {
1386 if message.id.as_ref() == Some(&id) {
1387 Some(message)
1388 } else {
1389 None
1390 }
1391 } else {
1392 None
1393 }
1394 })
1395 }
1396
1397 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1398 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1399 if let AgentThreadEntry::UserMessage(message) = entry {
1400 if message.id.as_ref() == Some(&id) {
1401 Some((ix, message))
1402 } else {
1403 None
1404 }
1405 } else {
1406 None
1407 }
1408 })
1409 }
1410
1411 pub fn read_text_file(
1412 &self,
1413 path: PathBuf,
1414 line: Option<u32>,
1415 limit: Option<u32>,
1416 reuse_shared_snapshot: bool,
1417 cx: &mut Context<Self>,
1418 ) -> Task<Result<String>> {
1419 let project = self.project.clone();
1420 let action_log = self.action_log.clone();
1421 cx.spawn(async move |this, cx| {
1422 let load = project.update(cx, |project, cx| {
1423 let path = project
1424 .project_path_for_absolute_path(&path, cx)
1425 .context("invalid path")?;
1426 anyhow::Ok(project.open_buffer(path, cx))
1427 });
1428 let buffer = load??.await?;
1429
1430 let snapshot = if reuse_shared_snapshot {
1431 this.read_with(cx, |this, _| {
1432 this.shared_buffers.get(&buffer.clone()).cloned()
1433 })
1434 .log_err()
1435 .flatten()
1436 } else {
1437 None
1438 };
1439
1440 let snapshot = if let Some(snapshot) = snapshot {
1441 snapshot
1442 } else {
1443 action_log.update(cx, |action_log, cx| {
1444 action_log.buffer_read(buffer.clone(), cx);
1445 })?;
1446 project.update(cx, |project, cx| {
1447 let position = buffer
1448 .read(cx)
1449 .snapshot()
1450 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1451 project.set_agent_location(
1452 Some(AgentLocation {
1453 buffer: buffer.downgrade(),
1454 position,
1455 }),
1456 cx,
1457 );
1458 })?;
1459
1460 buffer.update(cx, |buffer, _| buffer.snapshot())?
1461 };
1462
1463 this.update(cx, |this, _| {
1464 let text = snapshot.text();
1465 this.shared_buffers.insert(buffer.clone(), snapshot);
1466 if line.is_none() && limit.is_none() {
1467 return Ok(text);
1468 }
1469 let limit = limit.unwrap_or(u32::MAX) as usize;
1470 let Some(line) = line else {
1471 return Ok(text.lines().take(limit).collect::<String>());
1472 };
1473
1474 let count = text.lines().count();
1475 if count < line as usize {
1476 anyhow::bail!("There are only {} lines", count);
1477 }
1478 Ok(text
1479 .lines()
1480 .skip(line as usize + 1)
1481 .take(limit)
1482 .collect::<String>())
1483 })?
1484 })
1485 }
1486
1487 pub fn write_text_file(
1488 &self,
1489 path: PathBuf,
1490 content: String,
1491 cx: &mut Context<Self>,
1492 ) -> Task<Result<()>> {
1493 let project = self.project.clone();
1494 let action_log = self.action_log.clone();
1495 cx.spawn(async move |this, cx| {
1496 let load = project.update(cx, |project, cx| {
1497 let path = project
1498 .project_path_for_absolute_path(&path, cx)
1499 .context("invalid path")?;
1500 anyhow::Ok(project.open_buffer(path, cx))
1501 });
1502 let buffer = load??.await?;
1503 let snapshot = this.update(cx, |this, cx| {
1504 this.shared_buffers
1505 .get(&buffer)
1506 .cloned()
1507 .unwrap_or_else(|| buffer.read(cx).snapshot())
1508 })?;
1509 let edits = cx
1510 .background_executor()
1511 .spawn(async move {
1512 let old_text = snapshot.text();
1513 text_diff(old_text.as_str(), &content)
1514 .into_iter()
1515 .map(|(range, replacement)| {
1516 (
1517 snapshot.anchor_after(range.start)
1518 ..snapshot.anchor_before(range.end),
1519 replacement,
1520 )
1521 })
1522 .collect::<Vec<_>>()
1523 })
1524 .await;
1525 cx.update(|cx| {
1526 project.update(cx, |project, cx| {
1527 project.set_agent_location(
1528 Some(AgentLocation {
1529 buffer: buffer.downgrade(),
1530 position: edits
1531 .last()
1532 .map(|(range, _)| range.end)
1533 .unwrap_or(Anchor::MIN),
1534 }),
1535 cx,
1536 );
1537 });
1538
1539 action_log.update(cx, |action_log, cx| {
1540 action_log.buffer_read(buffer.clone(), cx);
1541 });
1542 buffer.update(cx, |buffer, cx| {
1543 buffer.edit(edits, None, cx);
1544 });
1545 action_log.update(cx, |action_log, cx| {
1546 action_log.buffer_edited(buffer.clone(), cx);
1547 });
1548 })?;
1549 project
1550 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1551 .await
1552 })
1553 }
1554
1555 pub fn to_markdown(&self, cx: &App) -> String {
1556 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1557 }
1558
1559 pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1560 cx.emit(AcpThreadEvent::ServerExited(status));
1561 }
1562}
1563
1564fn markdown_for_raw_output(
1565 raw_output: &serde_json::Value,
1566 language_registry: &Arc<LanguageRegistry>,
1567 cx: &mut App,
1568) -> Option<Entity<Markdown>> {
1569 match raw_output {
1570 serde_json::Value::Null => None,
1571 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1572 Markdown::new(
1573 value.to_string().into(),
1574 Some(language_registry.clone()),
1575 None,
1576 cx,
1577 )
1578 })),
1579 serde_json::Value::Number(value) => Some(cx.new(|cx| {
1580 Markdown::new(
1581 value.to_string().into(),
1582 Some(language_registry.clone()),
1583 None,
1584 cx,
1585 )
1586 })),
1587 serde_json::Value::String(value) => Some(cx.new(|cx| {
1588 Markdown::new(
1589 value.clone().into(),
1590 Some(language_registry.clone()),
1591 None,
1592 cx,
1593 )
1594 })),
1595 value => Some(cx.new(|cx| {
1596 Markdown::new(
1597 format!("```json\n{}\n```", value).into(),
1598 Some(language_registry.clone()),
1599 None,
1600 cx,
1601 )
1602 })),
1603 }
1604}
1605
1606#[cfg(test)]
1607mod tests {
1608 use super::*;
1609 use anyhow::anyhow;
1610 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1611 use gpui::{AsyncApp, TestAppContext, WeakEntity};
1612 use indoc::indoc;
1613 use project::{FakeFs, Fs};
1614 use rand::Rng as _;
1615 use serde_json::json;
1616 use settings::SettingsStore;
1617 use smol::stream::StreamExt as _;
1618 use std::{
1619 any::Any,
1620 cell::RefCell,
1621 path::Path,
1622 rc::Rc,
1623 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1624 time::Duration,
1625 };
1626 use util::path;
1627
1628 fn init_test(cx: &mut TestAppContext) {
1629 env_logger::try_init().ok();
1630 cx.update(|cx| {
1631 let settings_store = SettingsStore::test(cx);
1632 cx.set_global(settings_store);
1633 Project::init_settings(cx);
1634 language::init(cx);
1635 });
1636 }
1637
1638 #[gpui::test]
1639 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1640 init_test(cx);
1641
1642 let fs = FakeFs::new(cx.executor());
1643 let project = Project::test(fs, [], cx).await;
1644 let connection = Rc::new(FakeAgentConnection::new());
1645 let thread = cx
1646 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1647 .await
1648 .unwrap();
1649
1650 // Test creating a new user message
1651 thread.update(cx, |thread, cx| {
1652 thread.push_user_content_block(
1653 None,
1654 acp::ContentBlock::Text(acp::TextContent {
1655 annotations: None,
1656 text: "Hello, ".to_string(),
1657 }),
1658 cx,
1659 );
1660 });
1661
1662 thread.update(cx, |thread, cx| {
1663 assert_eq!(thread.entries.len(), 1);
1664 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1665 assert_eq!(user_msg.id, None);
1666 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1667 } else {
1668 panic!("Expected UserMessage");
1669 }
1670 });
1671
1672 // Test appending to existing user message
1673 let message_1_id = UserMessageId::new();
1674 thread.update(cx, |thread, cx| {
1675 thread.push_user_content_block(
1676 Some(message_1_id.clone()),
1677 acp::ContentBlock::Text(acp::TextContent {
1678 annotations: None,
1679 text: "world!".to_string(),
1680 }),
1681 cx,
1682 );
1683 });
1684
1685 thread.update(cx, |thread, cx| {
1686 assert_eq!(thread.entries.len(), 1);
1687 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1688 assert_eq!(user_msg.id, Some(message_1_id));
1689 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1690 } else {
1691 panic!("Expected UserMessage");
1692 }
1693 });
1694
1695 // Test creating new user message after assistant message
1696 thread.update(cx, |thread, cx| {
1697 thread.push_assistant_content_block(
1698 acp::ContentBlock::Text(acp::TextContent {
1699 annotations: None,
1700 text: "Assistant response".to_string(),
1701 }),
1702 false,
1703 cx,
1704 );
1705 });
1706
1707 let message_2_id = UserMessageId::new();
1708 thread.update(cx, |thread, cx| {
1709 thread.push_user_content_block(
1710 Some(message_2_id.clone()),
1711 acp::ContentBlock::Text(acp::TextContent {
1712 annotations: None,
1713 text: "New user message".to_string(),
1714 }),
1715 cx,
1716 );
1717 });
1718
1719 thread.update(cx, |thread, cx| {
1720 assert_eq!(thread.entries.len(), 3);
1721 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1722 assert_eq!(user_msg.id, Some(message_2_id));
1723 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1724 } else {
1725 panic!("Expected UserMessage at index 2");
1726 }
1727 });
1728 }
1729
1730 #[gpui::test]
1731 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1732 init_test(cx);
1733
1734 let fs = FakeFs::new(cx.executor());
1735 let project = Project::test(fs, [], cx).await;
1736 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1737 |_, thread, mut cx| {
1738 async move {
1739 thread.update(&mut cx, |thread, cx| {
1740 thread
1741 .handle_session_update(
1742 acp::SessionUpdate::AgentThoughtChunk {
1743 content: "Thinking ".into(),
1744 },
1745 cx,
1746 )
1747 .unwrap();
1748 thread
1749 .handle_session_update(
1750 acp::SessionUpdate::AgentThoughtChunk {
1751 content: "hard!".into(),
1752 },
1753 cx,
1754 )
1755 .unwrap();
1756 })?;
1757 Ok(acp::PromptResponse {
1758 stop_reason: acp::StopReason::EndTurn,
1759 })
1760 }
1761 .boxed_local()
1762 },
1763 ));
1764
1765 let thread = cx
1766 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1767 .await
1768 .unwrap();
1769
1770 thread
1771 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1772 .await
1773 .unwrap();
1774
1775 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1776 assert_eq!(
1777 output,
1778 indoc! {r#"
1779 ## User
1780
1781 Hello from Zed!
1782
1783 ## Assistant
1784
1785 <thinking>
1786 Thinking hard!
1787 </thinking>
1788
1789 "#}
1790 );
1791 }
1792
1793 #[gpui::test]
1794 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1795 init_test(cx);
1796
1797 let fs = FakeFs::new(cx.executor());
1798 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1799 .await;
1800 let project = Project::test(fs.clone(), [], cx).await;
1801 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1802 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1803 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1804 move |_, thread, mut cx| {
1805 let read_file_tx = read_file_tx.clone();
1806 async move {
1807 let content = thread
1808 .update(&mut cx, |thread, cx| {
1809 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1810 })
1811 .unwrap()
1812 .await
1813 .unwrap();
1814 assert_eq!(content, "one\ntwo\nthree\n");
1815 read_file_tx.take().unwrap().send(()).unwrap();
1816 thread
1817 .update(&mut cx, |thread, cx| {
1818 thread.write_text_file(
1819 path!("/tmp/foo").into(),
1820 "one\ntwo\nthree\nfour\nfive\n".to_string(),
1821 cx,
1822 )
1823 })
1824 .unwrap()
1825 .await
1826 .unwrap();
1827 Ok(acp::PromptResponse {
1828 stop_reason: acp::StopReason::EndTurn,
1829 })
1830 }
1831 .boxed_local()
1832 },
1833 ));
1834
1835 let (worktree, pathbuf) = project
1836 .update(cx, |project, cx| {
1837 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1838 })
1839 .await
1840 .unwrap();
1841 let buffer = project
1842 .update(cx, |project, cx| {
1843 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1844 })
1845 .await
1846 .unwrap();
1847
1848 let thread = cx
1849 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1850 .await
1851 .unwrap();
1852
1853 let request = thread.update(cx, |thread, cx| {
1854 thread.send_raw("Extend the count in /tmp/foo", cx)
1855 });
1856 read_file_rx.await.ok();
1857 buffer.update(cx, |buffer, cx| {
1858 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1859 });
1860 cx.run_until_parked();
1861 assert_eq!(
1862 buffer.read_with(cx, |buffer, _| buffer.text()),
1863 "zero\none\ntwo\nthree\nfour\nfive\n"
1864 );
1865 assert_eq!(
1866 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1867 "zero\none\ntwo\nthree\nfour\nfive\n"
1868 );
1869 request.await.unwrap();
1870 }
1871
1872 #[gpui::test]
1873 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1874 init_test(cx);
1875
1876 let fs = FakeFs::new(cx.executor());
1877 let project = Project::test(fs, [], cx).await;
1878 let id = acp::ToolCallId("test".into());
1879
1880 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1881 let id = id.clone();
1882 move |_, thread, mut cx| {
1883 let id = id.clone();
1884 async move {
1885 thread
1886 .update(&mut cx, |thread, cx| {
1887 thread.handle_session_update(
1888 acp::SessionUpdate::ToolCall(acp::ToolCall {
1889 id: id.clone(),
1890 title: "Label".into(),
1891 kind: acp::ToolKind::Fetch,
1892 status: acp::ToolCallStatus::InProgress,
1893 content: vec![],
1894 locations: vec![],
1895 raw_input: None,
1896 raw_output: None,
1897 }),
1898 cx,
1899 )
1900 })
1901 .unwrap()
1902 .unwrap();
1903 Ok(acp::PromptResponse {
1904 stop_reason: acp::StopReason::EndTurn,
1905 })
1906 }
1907 .boxed_local()
1908 }
1909 }));
1910
1911 let thread = cx
1912 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1913 .await
1914 .unwrap();
1915
1916 let request = thread.update(cx, |thread, cx| {
1917 thread.send_raw("Fetch https://example.com", cx)
1918 });
1919
1920 run_until_first_tool_call(&thread, cx).await;
1921
1922 thread.read_with(cx, |thread, _| {
1923 assert!(matches!(
1924 thread.entries[1],
1925 AgentThreadEntry::ToolCall(ToolCall {
1926 status: ToolCallStatus::Allowed {
1927 status: acp::ToolCallStatus::InProgress,
1928 ..
1929 },
1930 ..
1931 })
1932 ));
1933 });
1934
1935 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1936
1937 thread.read_with(cx, |thread, _| {
1938 assert!(matches!(
1939 &thread.entries[1],
1940 AgentThreadEntry::ToolCall(ToolCall {
1941 status: ToolCallStatus::Canceled,
1942 ..
1943 })
1944 ));
1945 });
1946
1947 thread
1948 .update(cx, |thread, cx| {
1949 thread.handle_session_update(
1950 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1951 id,
1952 fields: acp::ToolCallUpdateFields {
1953 status: Some(acp::ToolCallStatus::Completed),
1954 ..Default::default()
1955 },
1956 }),
1957 cx,
1958 )
1959 })
1960 .unwrap();
1961
1962 request.await.unwrap();
1963
1964 thread.read_with(cx, |thread, _| {
1965 assert!(matches!(
1966 thread.entries[1],
1967 AgentThreadEntry::ToolCall(ToolCall {
1968 status: ToolCallStatus::Allowed {
1969 status: acp::ToolCallStatus::Completed,
1970 ..
1971 },
1972 ..
1973 })
1974 ));
1975 });
1976 }
1977
1978 #[gpui::test]
1979 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
1980 init_test(cx);
1981 let fs = FakeFs::new(cx.background_executor.clone());
1982 fs.insert_tree(path!("/test"), json!({})).await;
1983 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
1984
1985 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1986 move |_, thread, mut cx| {
1987 async move {
1988 thread
1989 .update(&mut cx, |thread, cx| {
1990 thread.handle_session_update(
1991 acp::SessionUpdate::ToolCall(acp::ToolCall {
1992 id: acp::ToolCallId("test".into()),
1993 title: "Label".into(),
1994 kind: acp::ToolKind::Edit,
1995 status: acp::ToolCallStatus::Completed,
1996 content: vec![acp::ToolCallContent::Diff {
1997 diff: acp::Diff {
1998 path: "/test/test.txt".into(),
1999 old_text: None,
2000 new_text: "foo".into(),
2001 },
2002 }],
2003 locations: vec![],
2004 raw_input: None,
2005 raw_output: None,
2006 }),
2007 cx,
2008 )
2009 })
2010 .unwrap()
2011 .unwrap();
2012 Ok(acp::PromptResponse {
2013 stop_reason: acp::StopReason::EndTurn,
2014 })
2015 }
2016 .boxed_local()
2017 }
2018 }));
2019
2020 let thread = cx
2021 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2022 .await
2023 .unwrap();
2024
2025 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2026 .await
2027 .unwrap();
2028
2029 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2030 }
2031
2032 #[gpui::test(iterations = 10)]
2033 async fn test_checkpoints(cx: &mut TestAppContext) {
2034 init_test(cx);
2035 let fs = FakeFs::new(cx.background_executor.clone());
2036 fs.insert_tree(
2037 path!("/test"),
2038 json!({
2039 ".git": {}
2040 }),
2041 )
2042 .await;
2043 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2044
2045 let simulate_changes = Arc::new(AtomicBool::new(true));
2046 let next_filename = Arc::new(AtomicUsize::new(0));
2047 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2048 let simulate_changes = simulate_changes.clone();
2049 let next_filename = next_filename.clone();
2050 let fs = fs.clone();
2051 move |request, thread, mut cx| {
2052 let fs = fs.clone();
2053 let simulate_changes = simulate_changes.clone();
2054 let next_filename = next_filename.clone();
2055 async move {
2056 if simulate_changes.load(SeqCst) {
2057 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2058 fs.write(Path::new(&filename), b"").await?;
2059 }
2060
2061 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2062 panic!("expected text content block");
2063 };
2064 thread.update(&mut cx, |thread, cx| {
2065 thread
2066 .handle_session_update(
2067 acp::SessionUpdate::AgentMessageChunk {
2068 content: content.text.to_uppercase().into(),
2069 },
2070 cx,
2071 )
2072 .unwrap();
2073 })?;
2074 Ok(acp::PromptResponse {
2075 stop_reason: acp::StopReason::EndTurn,
2076 })
2077 }
2078 .boxed_local()
2079 }
2080 }));
2081 let thread = cx
2082 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2083 .await
2084 .unwrap();
2085
2086 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2087 .await
2088 .unwrap();
2089 thread.read_with(cx, |thread, cx| {
2090 assert_eq!(
2091 thread.to_markdown(cx),
2092 indoc! {"
2093 ## User (checkpoint)
2094
2095 Lorem
2096
2097 ## Assistant
2098
2099 LOREM
2100
2101 "}
2102 );
2103 });
2104 assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2105
2106 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2107 .await
2108 .unwrap();
2109 thread.read_with(cx, |thread, cx| {
2110 assert_eq!(
2111 thread.to_markdown(cx),
2112 indoc! {"
2113 ## User (checkpoint)
2114
2115 Lorem
2116
2117 ## Assistant
2118
2119 LOREM
2120
2121 ## User (checkpoint)
2122
2123 ipsum
2124
2125 ## Assistant
2126
2127 IPSUM
2128
2129 "}
2130 );
2131 });
2132 assert_eq!(
2133 fs.files(),
2134 vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2135 );
2136
2137 // Checkpoint isn't stored when there are no changes.
2138 simulate_changes.store(false, SeqCst);
2139 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2140 .await
2141 .unwrap();
2142 thread.read_with(cx, |thread, cx| {
2143 assert_eq!(
2144 thread.to_markdown(cx),
2145 indoc! {"
2146 ## User (checkpoint)
2147
2148 Lorem
2149
2150 ## Assistant
2151
2152 LOREM
2153
2154 ## User (checkpoint)
2155
2156 ipsum
2157
2158 ## Assistant
2159
2160 IPSUM
2161
2162 ## User
2163
2164 dolor
2165
2166 ## Assistant
2167
2168 DOLOR
2169
2170 "}
2171 );
2172 });
2173 assert_eq!(
2174 fs.files(),
2175 vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2176 );
2177
2178 // Rewinding the conversation truncates the history and restores the checkpoint.
2179 thread
2180 .update(cx, |thread, cx| {
2181 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2182 panic!("unexpected entries {:?}", thread.entries)
2183 };
2184 thread.rewind(message.id.clone().unwrap(), cx)
2185 })
2186 .await
2187 .unwrap();
2188 thread.read_with(cx, |thread, cx| {
2189 assert_eq!(
2190 thread.to_markdown(cx),
2191 indoc! {"
2192 ## User (checkpoint)
2193
2194 Lorem
2195
2196 ## Assistant
2197
2198 LOREM
2199
2200 "}
2201 );
2202 });
2203 assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2204 }
2205
2206 async fn run_until_first_tool_call(
2207 thread: &Entity<AcpThread>,
2208 cx: &mut TestAppContext,
2209 ) -> usize {
2210 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2211
2212 let subscription = cx.update(|cx| {
2213 cx.subscribe(thread, move |thread, _, cx| {
2214 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2215 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2216 return tx.try_send(ix).unwrap();
2217 }
2218 }
2219 })
2220 });
2221
2222 select! {
2223 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2224 panic!("Timeout waiting for tool call")
2225 }
2226 ix = rx.next().fuse() => {
2227 drop(subscription);
2228 ix.unwrap()
2229 }
2230 }
2231 }
2232
2233 #[derive(Clone, Default)]
2234 struct FakeAgentConnection {
2235 auth_methods: Vec<acp::AuthMethod>,
2236 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2237 on_user_message: Option<
2238 Rc<
2239 dyn Fn(
2240 acp::PromptRequest,
2241 WeakEntity<AcpThread>,
2242 AsyncApp,
2243 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2244 + 'static,
2245 >,
2246 >,
2247 }
2248
2249 impl FakeAgentConnection {
2250 fn new() -> Self {
2251 Self {
2252 auth_methods: Vec::new(),
2253 on_user_message: None,
2254 sessions: Arc::default(),
2255 }
2256 }
2257
2258 #[expect(unused)]
2259 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2260 self.auth_methods = auth_methods;
2261 self
2262 }
2263
2264 fn on_user_message(
2265 mut self,
2266 handler: impl Fn(
2267 acp::PromptRequest,
2268 WeakEntity<AcpThread>,
2269 AsyncApp,
2270 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2271 + 'static,
2272 ) -> Self {
2273 self.on_user_message.replace(Rc::new(handler));
2274 self
2275 }
2276 }
2277
2278 impl AgentConnection for FakeAgentConnection {
2279 fn auth_methods(&self) -> &[acp::AuthMethod] {
2280 &self.auth_methods
2281 }
2282
2283 fn new_thread(
2284 self: Rc<Self>,
2285 project: Entity<Project>,
2286 _cwd: &Path,
2287 cx: &mut gpui::App,
2288 ) -> Task<gpui::Result<Entity<AcpThread>>> {
2289 let session_id = acp::SessionId(
2290 rand::thread_rng()
2291 .sample_iter(&rand::distributions::Alphanumeric)
2292 .take(7)
2293 .map(char::from)
2294 .collect::<String>()
2295 .into(),
2296 );
2297 let thread =
2298 cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2299 self.sessions.lock().insert(session_id, thread.downgrade());
2300 Task::ready(Ok(thread))
2301 }
2302
2303 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2304 if self.auth_methods().iter().any(|m| m.id == method) {
2305 Task::ready(Ok(()))
2306 } else {
2307 Task::ready(Err(anyhow!("Invalid Auth Method")))
2308 }
2309 }
2310
2311 fn prompt(
2312 &self,
2313 _id: Option<UserMessageId>,
2314 params: acp::PromptRequest,
2315 cx: &mut App,
2316 ) -> Task<gpui::Result<acp::PromptResponse>> {
2317 let sessions = self.sessions.lock();
2318 let thread = sessions.get(¶ms.session_id).unwrap();
2319 if let Some(handler) = &self.on_user_message {
2320 let handler = handler.clone();
2321 let thread = thread.clone();
2322 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2323 } else {
2324 Task::ready(Ok(acp::PromptResponse {
2325 stop_reason: acp::StopReason::EndTurn,
2326 }))
2327 }
2328 }
2329
2330 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2331 let sessions = self.sessions.lock();
2332 let thread = sessions.get(&session_id).unwrap().clone();
2333
2334 cx.spawn(async move |cx| {
2335 thread
2336 .update(cx, |thread, cx| thread.cancel(cx))
2337 .unwrap()
2338 .await
2339 })
2340 .detach();
2341 }
2342
2343 fn session_editor(
2344 &self,
2345 session_id: &acp::SessionId,
2346 _cx: &mut App,
2347 ) -> Option<Rc<dyn AgentSessionEditor>> {
2348 Some(Rc::new(FakeAgentSessionEditor {
2349 _session_id: session_id.clone(),
2350 }))
2351 }
2352
2353 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2354 self
2355 }
2356 }
2357
2358 struct FakeAgentSessionEditor {
2359 _session_id: acp::SessionId,
2360 }
2361
2362 impl AgentSessionEditor for FakeAgentSessionEditor {
2363 fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2364 Task::ready(Ok(()))
2365 }
2366 }
2367}