1use std::fmt::Write as _;
2use std::io::Write;
3use std::ops::Range;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Result, anyhow};
8use assistant_settings::AssistantSettings;
9use assistant_tool::{ActionLog, AnyToolCard, Tool, ToolWorkingSet};
10use chrono::{DateTime, Utc};
11use collections::HashMap;
12use editor::display_map::CreaseMetadata;
13use feature_flags::{self, FeatureFlagAppExt};
14use futures::future::Shared;
15use futures::{FutureExt, StreamExt as _};
16use git::repository::DiffType;
17use gpui::{
18 AnyWindowHandle, App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task,
19 WeakEntity,
20};
21use language_model::{
22 ConfiguredModel, LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent,
23 LanguageModelId, LanguageModelKnownError, LanguageModelRegistry, LanguageModelRequest,
24 LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
25 LanguageModelToolUseId, MaxMonthlySpendReachedError, MessageContent,
26 ModelRequestLimitReachedError, PaymentRequiredError, RequestUsage, Role, SelectedModel,
27 StopReason, TokenUsage,
28};
29use postage::stream::Stream as _;
30use project::Project;
31use project::git_store::{GitStore, GitStoreCheckpoint, RepositoryState};
32use prompt_store::{ModelContext, PromptBuilder};
33use proto::Plan;
34use schemars::JsonSchema;
35use serde::{Deserialize, Serialize};
36use settings::Settings;
37use thiserror::Error;
38use util::{ResultExt as _, TryFutureExt as _, post_inc};
39use uuid::Uuid;
40use zed_llm_client::CompletionMode;
41
42use crate::ThreadStore;
43use crate::context::{AgentContext, AgentContextHandle, ContextLoadResult, LoadedContext};
44use crate::thread_store::{
45 SerializedCrease, SerializedLanguageModel, SerializedMessage, SerializedMessageSegment,
46 SerializedThread, SerializedToolResult, SerializedToolUse, SharedProjectContext,
47};
48use crate::tool_use::{PendingToolUse, ToolUse, ToolUseMetadata, ToolUseState};
49
50#[derive(
51 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, JsonSchema,
52)]
53pub struct ThreadId(Arc<str>);
54
55impl ThreadId {
56 pub fn new() -> Self {
57 Self(Uuid::new_v4().to_string().into())
58 }
59}
60
61impl std::fmt::Display for ThreadId {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0)
64 }
65}
66
67impl From<&str> for ThreadId {
68 fn from(value: &str) -> Self {
69 Self(value.into())
70 }
71}
72
73/// The ID of the user prompt that initiated a request.
74///
75/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
77pub struct PromptId(Arc<str>);
78
79impl PromptId {
80 pub fn new() -> Self {
81 Self(Uuid::new_v4().to_string().into())
82 }
83}
84
85impl std::fmt::Display for PromptId {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "{}", self.0)
88 }
89}
90
91#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, Serialize, Deserialize)]
92pub struct MessageId(pub(crate) usize);
93
94impl MessageId {
95 fn post_inc(&mut self) -> Self {
96 Self(post_inc(&mut self.0))
97 }
98}
99
100/// Stored information that can be used to resurrect a context crease when creating an editor for a past message.
101#[derive(Clone, Debug)]
102pub struct MessageCrease {
103 pub range: Range<usize>,
104 pub metadata: CreaseMetadata,
105 /// None for a deserialized message, Some otherwise.
106 pub context: Option<AgentContextHandle>,
107}
108
109/// A message in a [`Thread`].
110#[derive(Debug, Clone)]
111pub struct Message {
112 pub id: MessageId,
113 pub role: Role,
114 pub segments: Vec<MessageSegment>,
115 pub loaded_context: LoadedContext,
116 pub creases: Vec<MessageCrease>,
117}
118
119impl Message {
120 /// Returns whether the message contains any meaningful text that should be displayed
121 /// The model sometimes runs tool without producing any text or just a marker ([`USING_TOOL_MARKER`])
122 pub fn should_display_content(&self) -> bool {
123 self.segments.iter().all(|segment| segment.should_display())
124 }
125
126 pub fn push_thinking(&mut self, text: &str, signature: Option<String>) {
127 if let Some(MessageSegment::Thinking {
128 text: segment,
129 signature: current_signature,
130 }) = self.segments.last_mut()
131 {
132 if let Some(signature) = signature {
133 *current_signature = Some(signature);
134 }
135 segment.push_str(text);
136 } else {
137 self.segments.push(MessageSegment::Thinking {
138 text: text.to_string(),
139 signature,
140 });
141 }
142 }
143
144 pub fn push_text(&mut self, text: &str) {
145 if let Some(MessageSegment::Text(segment)) = self.segments.last_mut() {
146 segment.push_str(text);
147 } else {
148 self.segments.push(MessageSegment::Text(text.to_string()));
149 }
150 }
151
152 pub fn to_string(&self) -> String {
153 let mut result = String::new();
154
155 if !self.loaded_context.text.is_empty() {
156 result.push_str(&self.loaded_context.text);
157 }
158
159 for segment in &self.segments {
160 match segment {
161 MessageSegment::Text(text) => result.push_str(text),
162 MessageSegment::Thinking { text, .. } => {
163 result.push_str("<think>\n");
164 result.push_str(text);
165 result.push_str("\n</think>");
166 }
167 MessageSegment::RedactedThinking(_) => {}
168 }
169 }
170
171 result
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum MessageSegment {
177 Text(String),
178 Thinking {
179 text: String,
180 signature: Option<String>,
181 },
182 RedactedThinking(Vec<u8>),
183}
184
185impl MessageSegment {
186 pub fn should_display(&self) -> bool {
187 match self {
188 Self::Text(text) => text.is_empty(),
189 Self::Thinking { text, .. } => text.is_empty(),
190 Self::RedactedThinking(_) => false,
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct ProjectSnapshot {
197 pub worktree_snapshots: Vec<WorktreeSnapshot>,
198 pub unsaved_buffer_paths: Vec<String>,
199 pub timestamp: DateTime<Utc>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct WorktreeSnapshot {
204 pub worktree_path: String,
205 pub git_state: Option<GitState>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct GitState {
210 pub remote_url: Option<String>,
211 pub head_sha: Option<String>,
212 pub current_branch: Option<String>,
213 pub diff: Option<String>,
214}
215
216#[derive(Clone)]
217pub struct ThreadCheckpoint {
218 message_id: MessageId,
219 git_checkpoint: GitStoreCheckpoint,
220}
221
222#[derive(Copy, Clone, Debug, PartialEq, Eq)]
223pub enum ThreadFeedback {
224 Positive,
225 Negative,
226}
227
228pub enum LastRestoreCheckpoint {
229 Pending {
230 message_id: MessageId,
231 },
232 Error {
233 message_id: MessageId,
234 error: String,
235 },
236}
237
238impl LastRestoreCheckpoint {
239 pub fn message_id(&self) -> MessageId {
240 match self {
241 LastRestoreCheckpoint::Pending { message_id } => *message_id,
242 LastRestoreCheckpoint::Error { message_id, .. } => *message_id,
243 }
244 }
245}
246
247#[derive(Clone, Debug, Default, Serialize, Deserialize)]
248pub enum DetailedSummaryState {
249 #[default]
250 NotGenerated,
251 Generating {
252 message_id: MessageId,
253 },
254 Generated {
255 text: SharedString,
256 message_id: MessageId,
257 },
258}
259
260impl DetailedSummaryState {
261 fn text(&self) -> Option<SharedString> {
262 if let Self::Generated { text, .. } = self {
263 Some(text.clone())
264 } else {
265 None
266 }
267 }
268}
269
270#[derive(Default)]
271pub struct TotalTokenUsage {
272 pub total: usize,
273 pub max: usize,
274}
275
276impl TotalTokenUsage {
277 pub fn ratio(&self) -> TokenUsageRatio {
278 #[cfg(debug_assertions)]
279 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
280 .unwrap_or("0.8".to_string())
281 .parse()
282 .unwrap();
283 #[cfg(not(debug_assertions))]
284 let warning_threshold: f32 = 0.8;
285
286 // When the maximum is unknown because there is no selected model,
287 // avoid showing the token limit warning.
288 if self.max == 0 {
289 TokenUsageRatio::Normal
290 } else if self.total >= self.max {
291 TokenUsageRatio::Exceeded
292 } else if self.total as f32 / self.max as f32 >= warning_threshold {
293 TokenUsageRatio::Warning
294 } else {
295 TokenUsageRatio::Normal
296 }
297 }
298
299 pub fn add(&self, tokens: usize) -> TotalTokenUsage {
300 TotalTokenUsage {
301 total: self.total + tokens,
302 max: self.max,
303 }
304 }
305}
306
307#[derive(Debug, Default, PartialEq, Eq)]
308pub enum TokenUsageRatio {
309 #[default]
310 Normal,
311 Warning,
312 Exceeded,
313}
314
315fn default_completion_mode(cx: &App) -> CompletionMode {
316 if cx.is_staff() {
317 CompletionMode::Max
318 } else {
319 CompletionMode::Normal
320 }
321}
322
323#[derive(Debug, Clone, Copy)]
324pub enum QueueState {
325 Sending,
326 Queued { position: usize },
327 Started,
328}
329
330/// A thread of conversation with the LLM.
331pub struct Thread {
332 id: ThreadId,
333 updated_at: DateTime<Utc>,
334 summary: Option<SharedString>,
335 pending_summary: Task<Option<()>>,
336 detailed_summary_task: Task<Option<()>>,
337 detailed_summary_tx: postage::watch::Sender<DetailedSummaryState>,
338 detailed_summary_rx: postage::watch::Receiver<DetailedSummaryState>,
339 completion_mode: CompletionMode,
340 messages: Vec<Message>,
341 next_message_id: MessageId,
342 last_prompt_id: PromptId,
343 project_context: SharedProjectContext,
344 checkpoints_by_message: HashMap<MessageId, ThreadCheckpoint>,
345 completion_count: usize,
346 pending_completions: Vec<PendingCompletion>,
347 project: Entity<Project>,
348 prompt_builder: Arc<PromptBuilder>,
349 tools: Entity<ToolWorkingSet>,
350 tool_use: ToolUseState,
351 action_log: Entity<ActionLog>,
352 last_restore_checkpoint: Option<LastRestoreCheckpoint>,
353 pending_checkpoint: Option<ThreadCheckpoint>,
354 initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
355 request_token_usage: Vec<TokenUsage>,
356 cumulative_token_usage: TokenUsage,
357 exceeded_window_error: Option<ExceededWindowError>,
358 feedback: Option<ThreadFeedback>,
359 message_feedback: HashMap<MessageId, ThreadFeedback>,
360 last_auto_capture_at: Option<Instant>,
361 request_callback: Option<
362 Box<dyn FnMut(&LanguageModelRequest, &[Result<LanguageModelCompletionEvent, String>])>,
363 >,
364 remaining_turns: u32,
365 configured_model: Option<ConfiguredModel>,
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct ExceededWindowError {
370 /// Model used when last message exceeded context window
371 model_id: LanguageModelId,
372 /// Token count including last message
373 token_count: usize,
374}
375
376impl Thread {
377 pub fn new(
378 project: Entity<Project>,
379 tools: Entity<ToolWorkingSet>,
380 prompt_builder: Arc<PromptBuilder>,
381 system_prompt: SharedProjectContext,
382 cx: &mut Context<Self>,
383 ) -> Self {
384 let (detailed_summary_tx, detailed_summary_rx) = postage::watch::channel();
385 let configured_model = LanguageModelRegistry::read_global(cx).default_model();
386
387 Self {
388 id: ThreadId::new(),
389 updated_at: Utc::now(),
390 summary: None,
391 pending_summary: Task::ready(None),
392 detailed_summary_task: Task::ready(None),
393 detailed_summary_tx,
394 detailed_summary_rx,
395 completion_mode: default_completion_mode(cx),
396 messages: Vec::new(),
397 next_message_id: MessageId(0),
398 last_prompt_id: PromptId::new(),
399 project_context: system_prompt,
400 checkpoints_by_message: HashMap::default(),
401 completion_count: 0,
402 pending_completions: Vec::new(),
403 project: project.clone(),
404 prompt_builder,
405 tools: tools.clone(),
406 last_restore_checkpoint: None,
407 pending_checkpoint: None,
408 tool_use: ToolUseState::new(tools.clone()),
409 action_log: cx.new(|_| ActionLog::new(project.clone())),
410 initial_project_snapshot: {
411 let project_snapshot = Self::project_snapshot(project, cx);
412 cx.foreground_executor()
413 .spawn(async move { Some(project_snapshot.await) })
414 .shared()
415 },
416 request_token_usage: Vec::new(),
417 cumulative_token_usage: TokenUsage::default(),
418 exceeded_window_error: None,
419 feedback: None,
420 message_feedback: HashMap::default(),
421 last_auto_capture_at: None,
422 request_callback: None,
423 remaining_turns: u32::MAX,
424 configured_model,
425 }
426 }
427
428 pub fn deserialize(
429 id: ThreadId,
430 serialized: SerializedThread,
431 project: Entity<Project>,
432 tools: Entity<ToolWorkingSet>,
433 prompt_builder: Arc<PromptBuilder>,
434 project_context: SharedProjectContext,
435 cx: &mut Context<Self>,
436 ) -> Self {
437 let next_message_id = MessageId(
438 serialized
439 .messages
440 .last()
441 .map(|message| message.id.0 + 1)
442 .unwrap_or(0),
443 );
444 let tool_use = ToolUseState::from_serialized_messages(tools.clone(), &serialized.messages);
445 let (detailed_summary_tx, detailed_summary_rx) =
446 postage::watch::channel_with(serialized.detailed_summary_state);
447
448 let configured_model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
449 serialized
450 .model
451 .and_then(|model| {
452 let model = SelectedModel {
453 provider: model.provider.clone().into(),
454 model: model.model.clone().into(),
455 };
456 registry.select_model(&model, cx)
457 })
458 .or_else(|| registry.default_model())
459 });
460
461 Self {
462 id,
463 updated_at: serialized.updated_at,
464 summary: Some(serialized.summary),
465 pending_summary: Task::ready(None),
466 detailed_summary_task: Task::ready(None),
467 detailed_summary_tx,
468 detailed_summary_rx,
469 completion_mode: default_completion_mode(cx),
470 messages: serialized
471 .messages
472 .into_iter()
473 .map(|message| Message {
474 id: message.id,
475 role: message.role,
476 segments: message
477 .segments
478 .into_iter()
479 .map(|segment| match segment {
480 SerializedMessageSegment::Text { text } => MessageSegment::Text(text),
481 SerializedMessageSegment::Thinking { text, signature } => {
482 MessageSegment::Thinking { text, signature }
483 }
484 SerializedMessageSegment::RedactedThinking { data } => {
485 MessageSegment::RedactedThinking(data)
486 }
487 })
488 .collect(),
489 loaded_context: LoadedContext {
490 contexts: Vec::new(),
491 text: message.context,
492 images: Vec::new(),
493 },
494 creases: message
495 .creases
496 .into_iter()
497 .map(|crease| MessageCrease {
498 range: crease.start..crease.end,
499 metadata: CreaseMetadata {
500 icon_path: crease.icon_path,
501 label: crease.label,
502 },
503 context: None,
504 })
505 .collect(),
506 })
507 .collect(),
508 next_message_id,
509 last_prompt_id: PromptId::new(),
510 project_context,
511 checkpoints_by_message: HashMap::default(),
512 completion_count: 0,
513 pending_completions: Vec::new(),
514 last_restore_checkpoint: None,
515 pending_checkpoint: None,
516 project: project.clone(),
517 prompt_builder,
518 tools,
519 tool_use,
520 action_log: cx.new(|_| ActionLog::new(project)),
521 initial_project_snapshot: Task::ready(serialized.initial_project_snapshot).shared(),
522 request_token_usage: serialized.request_token_usage,
523 cumulative_token_usage: serialized.cumulative_token_usage,
524 exceeded_window_error: None,
525 feedback: None,
526 message_feedback: HashMap::default(),
527 last_auto_capture_at: None,
528 request_callback: None,
529 remaining_turns: u32::MAX,
530 configured_model,
531 }
532 }
533
534 pub fn set_request_callback(
535 &mut self,
536 callback: impl 'static
537 + FnMut(&LanguageModelRequest, &[Result<LanguageModelCompletionEvent, String>]),
538 ) {
539 self.request_callback = Some(Box::new(callback));
540 }
541
542 pub fn id(&self) -> &ThreadId {
543 &self.id
544 }
545
546 pub fn is_empty(&self) -> bool {
547 self.messages.is_empty()
548 }
549
550 pub fn updated_at(&self) -> DateTime<Utc> {
551 self.updated_at
552 }
553
554 pub fn touch_updated_at(&mut self) {
555 self.updated_at = Utc::now();
556 }
557
558 pub fn advance_prompt_id(&mut self) {
559 self.last_prompt_id = PromptId::new();
560 }
561
562 pub fn summary(&self) -> Option<SharedString> {
563 self.summary.clone()
564 }
565
566 pub fn project_context(&self) -> SharedProjectContext {
567 self.project_context.clone()
568 }
569
570 pub fn get_or_init_configured_model(&mut self, cx: &App) -> Option<ConfiguredModel> {
571 if self.configured_model.is_none() {
572 self.configured_model = LanguageModelRegistry::read_global(cx).default_model();
573 }
574 self.configured_model.clone()
575 }
576
577 pub fn configured_model(&self) -> Option<ConfiguredModel> {
578 self.configured_model.clone()
579 }
580
581 pub fn set_configured_model(&mut self, model: Option<ConfiguredModel>, cx: &mut Context<Self>) {
582 self.configured_model = model;
583 cx.notify();
584 }
585
586 pub const DEFAULT_SUMMARY: SharedString = SharedString::new_static("New Thread");
587
588 pub fn summary_or_default(&self) -> SharedString {
589 self.summary.clone().unwrap_or(Self::DEFAULT_SUMMARY)
590 }
591
592 pub fn set_summary(&mut self, new_summary: impl Into<SharedString>, cx: &mut Context<Self>) {
593 let Some(current_summary) = &self.summary else {
594 // Don't allow setting summary until generated
595 return;
596 };
597
598 let mut new_summary = new_summary.into();
599
600 if new_summary.is_empty() {
601 new_summary = Self::DEFAULT_SUMMARY;
602 }
603
604 if current_summary != &new_summary {
605 self.summary = Some(new_summary);
606 cx.emit(ThreadEvent::SummaryChanged);
607 }
608 }
609
610 pub fn completion_mode(&self) -> CompletionMode {
611 self.completion_mode
612 }
613
614 pub fn set_completion_mode(&mut self, mode: CompletionMode) {
615 self.completion_mode = mode;
616 }
617
618 pub fn message(&self, id: MessageId) -> Option<&Message> {
619 let index = self
620 .messages
621 .binary_search_by(|message| message.id.cmp(&id))
622 .ok()?;
623
624 self.messages.get(index)
625 }
626
627 pub fn messages(&self) -> impl ExactSizeIterator<Item = &Message> {
628 self.messages.iter()
629 }
630
631 pub fn is_generating(&self) -> bool {
632 !self.pending_completions.is_empty() || !self.all_tools_finished()
633 }
634
635 pub fn queue_state(&self) -> Option<QueueState> {
636 self.pending_completions
637 .first()
638 .map(|pending_completion| pending_completion.queue_state)
639 }
640
641 pub fn tools(&self) -> &Entity<ToolWorkingSet> {
642 &self.tools
643 }
644
645 pub fn pending_tool(&self, id: &LanguageModelToolUseId) -> Option<&PendingToolUse> {
646 self.tool_use
647 .pending_tool_uses()
648 .into_iter()
649 .find(|tool_use| &tool_use.id == id)
650 }
651
652 pub fn tools_needing_confirmation(&self) -> impl Iterator<Item = &PendingToolUse> {
653 self.tool_use
654 .pending_tool_uses()
655 .into_iter()
656 .filter(|tool_use| tool_use.status.needs_confirmation())
657 }
658
659 pub fn has_pending_tool_uses(&self) -> bool {
660 !self.tool_use.pending_tool_uses().is_empty()
661 }
662
663 pub fn checkpoint_for_message(&self, id: MessageId) -> Option<ThreadCheckpoint> {
664 self.checkpoints_by_message.get(&id).cloned()
665 }
666
667 pub fn restore_checkpoint(
668 &mut self,
669 checkpoint: ThreadCheckpoint,
670 cx: &mut Context<Self>,
671 ) -> Task<Result<()>> {
672 self.last_restore_checkpoint = Some(LastRestoreCheckpoint::Pending {
673 message_id: checkpoint.message_id,
674 });
675 cx.emit(ThreadEvent::CheckpointChanged);
676 cx.notify();
677
678 let git_store = self.project().read(cx).git_store().clone();
679 let restore = git_store.update(cx, |git_store, cx| {
680 git_store.restore_checkpoint(checkpoint.git_checkpoint.clone(), cx)
681 });
682
683 cx.spawn(async move |this, cx| {
684 let result = restore.await;
685 this.update(cx, |this, cx| {
686 if let Err(err) = result.as_ref() {
687 this.last_restore_checkpoint = Some(LastRestoreCheckpoint::Error {
688 message_id: checkpoint.message_id,
689 error: err.to_string(),
690 });
691 } else {
692 this.truncate(checkpoint.message_id, cx);
693 this.last_restore_checkpoint = None;
694 }
695 this.pending_checkpoint = None;
696 cx.emit(ThreadEvent::CheckpointChanged);
697 cx.notify();
698 })?;
699 result
700 })
701 }
702
703 fn finalize_pending_checkpoint(&mut self, cx: &mut Context<Self>) {
704 let pending_checkpoint = if self.is_generating() {
705 return;
706 } else if let Some(checkpoint) = self.pending_checkpoint.take() {
707 checkpoint
708 } else {
709 return;
710 };
711
712 let git_store = self.project.read(cx).git_store().clone();
713 let final_checkpoint = git_store.update(cx, |git_store, cx| git_store.checkpoint(cx));
714 cx.spawn(async move |this, cx| match final_checkpoint.await {
715 Ok(final_checkpoint) => {
716 let equal = git_store
717 .update(cx, |store, cx| {
718 store.compare_checkpoints(
719 pending_checkpoint.git_checkpoint.clone(),
720 final_checkpoint.clone(),
721 cx,
722 )
723 })?
724 .await
725 .unwrap_or(false);
726
727 if !equal {
728 this.update(cx, |this, cx| {
729 this.insert_checkpoint(pending_checkpoint, cx)
730 })?;
731 }
732
733 Ok(())
734 }
735 Err(_) => this.update(cx, |this, cx| {
736 this.insert_checkpoint(pending_checkpoint, cx)
737 }),
738 })
739 .detach();
740 }
741
742 fn insert_checkpoint(&mut self, checkpoint: ThreadCheckpoint, cx: &mut Context<Self>) {
743 self.checkpoints_by_message
744 .insert(checkpoint.message_id, checkpoint);
745 cx.emit(ThreadEvent::CheckpointChanged);
746 cx.notify();
747 }
748
749 pub fn last_restore_checkpoint(&self) -> Option<&LastRestoreCheckpoint> {
750 self.last_restore_checkpoint.as_ref()
751 }
752
753 pub fn truncate(&mut self, message_id: MessageId, cx: &mut Context<Self>) {
754 let Some(message_ix) = self
755 .messages
756 .iter()
757 .rposition(|message| message.id == message_id)
758 else {
759 return;
760 };
761 for deleted_message in self.messages.drain(message_ix..) {
762 self.checkpoints_by_message.remove(&deleted_message.id);
763 }
764 cx.notify();
765 }
766
767 pub fn context_for_message(&self, id: MessageId) -> impl Iterator<Item = &AgentContext> {
768 self.messages
769 .iter()
770 .find(|message| message.id == id)
771 .into_iter()
772 .flat_map(|message| message.loaded_context.contexts.iter())
773 }
774
775 pub fn is_turn_end(&self, ix: usize) -> bool {
776 if self.messages.is_empty() {
777 return false;
778 }
779
780 if !self.is_generating() && ix == self.messages.len() - 1 {
781 return true;
782 }
783
784 let Some(message) = self.messages.get(ix) else {
785 return false;
786 };
787
788 if message.role != Role::Assistant {
789 return false;
790 }
791
792 self.messages
793 .get(ix + 1)
794 .and_then(|message| {
795 self.message(message.id)
796 .map(|next_message| next_message.role == Role::User)
797 })
798 .unwrap_or(false)
799 }
800
801 /// Returns whether all of the tool uses have finished running.
802 pub fn all_tools_finished(&self) -> bool {
803 // If the only pending tool uses left are the ones with errors, then
804 // that means that we've finished running all of the pending tools.
805 self.tool_use
806 .pending_tool_uses()
807 .iter()
808 .all(|tool_use| tool_use.status.is_error())
809 }
810
811 pub fn tool_uses_for_message(&self, id: MessageId, cx: &App) -> Vec<ToolUse> {
812 self.tool_use.tool_uses_for_message(id, cx)
813 }
814
815 pub fn tool_results_for_message(
816 &self,
817 assistant_message_id: MessageId,
818 ) -> Vec<&LanguageModelToolResult> {
819 self.tool_use.tool_results_for_message(assistant_message_id)
820 }
821
822 pub fn tool_result(&self, id: &LanguageModelToolUseId) -> Option<&LanguageModelToolResult> {
823 self.tool_use.tool_result(id)
824 }
825
826 pub fn output_for_tool(&self, id: &LanguageModelToolUseId) -> Option<&Arc<str>> {
827 Some(&self.tool_use.tool_result(id)?.content)
828 }
829
830 pub fn card_for_tool(&self, id: &LanguageModelToolUseId) -> Option<AnyToolCard> {
831 self.tool_use.tool_result_card(id).cloned()
832 }
833
834 /// Return tools that are both enabled and supported by the model
835 pub fn available_tools(
836 &self,
837 cx: &App,
838 model: Arc<dyn LanguageModel>,
839 ) -> Vec<LanguageModelRequestTool> {
840 if model.supports_tools() {
841 self.tools()
842 .read(cx)
843 .enabled_tools(cx)
844 .into_iter()
845 .filter_map(|tool| {
846 // Skip tools that cannot be supported
847 let input_schema = tool.input_schema(model.tool_input_format()).ok()?;
848 Some(LanguageModelRequestTool {
849 name: tool.name(),
850 description: tool.description(),
851 input_schema,
852 })
853 })
854 .collect()
855 } else {
856 Vec::default()
857 }
858 }
859
860 pub fn insert_user_message(
861 &mut self,
862 text: impl Into<String>,
863 loaded_context: ContextLoadResult,
864 git_checkpoint: Option<GitStoreCheckpoint>,
865 creases: Vec<MessageCrease>,
866 cx: &mut Context<Self>,
867 ) -> MessageId {
868 if !loaded_context.referenced_buffers.is_empty() {
869 self.action_log.update(cx, |log, cx| {
870 for buffer in loaded_context.referenced_buffers {
871 log.track_buffer(buffer, cx);
872 }
873 });
874 }
875
876 let message_id = self.insert_message(
877 Role::User,
878 vec![MessageSegment::Text(text.into())],
879 loaded_context.loaded_context,
880 creases,
881 cx,
882 );
883
884 if let Some(git_checkpoint) = git_checkpoint {
885 self.pending_checkpoint = Some(ThreadCheckpoint {
886 message_id,
887 git_checkpoint,
888 });
889 }
890
891 self.auto_capture_telemetry(cx);
892
893 message_id
894 }
895
896 pub fn insert_assistant_message(
897 &mut self,
898 segments: Vec<MessageSegment>,
899 cx: &mut Context<Self>,
900 ) -> MessageId {
901 self.insert_message(
902 Role::Assistant,
903 segments,
904 LoadedContext::default(),
905 Vec::new(),
906 cx,
907 )
908 }
909
910 pub fn insert_message(
911 &mut self,
912 role: Role,
913 segments: Vec<MessageSegment>,
914 loaded_context: LoadedContext,
915 creases: Vec<MessageCrease>,
916 cx: &mut Context<Self>,
917 ) -> MessageId {
918 let id = self.next_message_id.post_inc();
919 self.messages.push(Message {
920 id,
921 role,
922 segments,
923 loaded_context,
924 creases,
925 });
926 self.touch_updated_at();
927 cx.emit(ThreadEvent::MessageAdded(id));
928 id
929 }
930
931 pub fn edit_message(
932 &mut self,
933 id: MessageId,
934 new_role: Role,
935 new_segments: Vec<MessageSegment>,
936 loaded_context: Option<LoadedContext>,
937 cx: &mut Context<Self>,
938 ) -> bool {
939 let Some(message) = self.messages.iter_mut().find(|message| message.id == id) else {
940 return false;
941 };
942 message.role = new_role;
943 message.segments = new_segments;
944 if let Some(context) = loaded_context {
945 message.loaded_context = context;
946 }
947 self.touch_updated_at();
948 cx.emit(ThreadEvent::MessageEdited(id));
949 true
950 }
951
952 pub fn delete_message(&mut self, id: MessageId, cx: &mut Context<Self>) -> bool {
953 let Some(index) = self.messages.iter().position(|message| message.id == id) else {
954 return false;
955 };
956 self.messages.remove(index);
957 self.touch_updated_at();
958 cx.emit(ThreadEvent::MessageDeleted(id));
959 true
960 }
961
962 /// Returns the representation of this [`Thread`] in a textual form.
963 ///
964 /// This is the representation we use when attaching a thread as context to another thread.
965 pub fn text(&self) -> String {
966 let mut text = String::new();
967
968 for message in &self.messages {
969 text.push_str(match message.role {
970 language_model::Role::User => "User:",
971 language_model::Role::Assistant => "Assistant:",
972 language_model::Role::System => "System:",
973 });
974 text.push('\n');
975
976 for segment in &message.segments {
977 match segment {
978 MessageSegment::Text(content) => text.push_str(content),
979 MessageSegment::Thinking { text: content, .. } => {
980 text.push_str(&format!("<think>{}</think>", content))
981 }
982 MessageSegment::RedactedThinking(_) => {}
983 }
984 }
985 text.push('\n');
986 }
987
988 text
989 }
990
991 /// Serializes this thread into a format for storage or telemetry.
992 pub fn serialize(&self, cx: &mut Context<Self>) -> Task<Result<SerializedThread>> {
993 let initial_project_snapshot = self.initial_project_snapshot.clone();
994 cx.spawn(async move |this, cx| {
995 let initial_project_snapshot = initial_project_snapshot.await;
996 this.read_with(cx, |this, cx| SerializedThread {
997 version: SerializedThread::VERSION.to_string(),
998 summary: this.summary_or_default(),
999 updated_at: this.updated_at(),
1000 messages: this
1001 .messages()
1002 .map(|message| SerializedMessage {
1003 id: message.id,
1004 role: message.role,
1005 segments: message
1006 .segments
1007 .iter()
1008 .map(|segment| match segment {
1009 MessageSegment::Text(text) => {
1010 SerializedMessageSegment::Text { text: text.clone() }
1011 }
1012 MessageSegment::Thinking { text, signature } => {
1013 SerializedMessageSegment::Thinking {
1014 text: text.clone(),
1015 signature: signature.clone(),
1016 }
1017 }
1018 MessageSegment::RedactedThinking(data) => {
1019 SerializedMessageSegment::RedactedThinking {
1020 data: data.clone(),
1021 }
1022 }
1023 })
1024 .collect(),
1025 tool_uses: this
1026 .tool_uses_for_message(message.id, cx)
1027 .into_iter()
1028 .map(|tool_use| SerializedToolUse {
1029 id: tool_use.id,
1030 name: tool_use.name,
1031 input: tool_use.input,
1032 })
1033 .collect(),
1034 tool_results: this
1035 .tool_results_for_message(message.id)
1036 .into_iter()
1037 .map(|tool_result| SerializedToolResult {
1038 tool_use_id: tool_result.tool_use_id.clone(),
1039 is_error: tool_result.is_error,
1040 content: tool_result.content.clone(),
1041 })
1042 .collect(),
1043 context: message.loaded_context.text.clone(),
1044 creases: message
1045 .creases
1046 .iter()
1047 .map(|crease| SerializedCrease {
1048 start: crease.range.start,
1049 end: crease.range.end,
1050 icon_path: crease.metadata.icon_path.clone(),
1051 label: crease.metadata.label.clone(),
1052 })
1053 .collect(),
1054 })
1055 .collect(),
1056 initial_project_snapshot,
1057 cumulative_token_usage: this.cumulative_token_usage,
1058 request_token_usage: this.request_token_usage.clone(),
1059 detailed_summary_state: this.detailed_summary_rx.borrow().clone(),
1060 exceeded_window_error: this.exceeded_window_error.clone(),
1061 model: this
1062 .configured_model
1063 .as_ref()
1064 .map(|model| SerializedLanguageModel {
1065 provider: model.provider.id().0.to_string(),
1066 model: model.model.id().0.to_string(),
1067 }),
1068 })
1069 })
1070 }
1071
1072 pub fn remaining_turns(&self) -> u32 {
1073 self.remaining_turns
1074 }
1075
1076 pub fn set_remaining_turns(&mut self, remaining_turns: u32) {
1077 self.remaining_turns = remaining_turns;
1078 }
1079
1080 pub fn send_to_model(
1081 &mut self,
1082 model: Arc<dyn LanguageModel>,
1083 window: Option<AnyWindowHandle>,
1084 cx: &mut Context<Self>,
1085 ) {
1086 if self.remaining_turns == 0 {
1087 return;
1088 }
1089
1090 self.remaining_turns -= 1;
1091
1092 let request = self.to_completion_request(model.clone(), cx);
1093
1094 self.stream_completion(request, model, window, cx);
1095 }
1096
1097 pub fn used_tools_since_last_user_message(&self) -> bool {
1098 for message in self.messages.iter().rev() {
1099 if self.tool_use.message_has_tool_results(message.id) {
1100 return true;
1101 } else if message.role == Role::User {
1102 return false;
1103 }
1104 }
1105
1106 false
1107 }
1108
1109 pub fn to_completion_request(
1110 &self,
1111 model: Arc<dyn LanguageModel>,
1112 cx: &mut Context<Self>,
1113 ) -> LanguageModelRequest {
1114 let mut request = LanguageModelRequest {
1115 thread_id: Some(self.id.to_string()),
1116 prompt_id: Some(self.last_prompt_id.to_string()),
1117 mode: None,
1118 messages: vec![],
1119 tools: Vec::new(),
1120 stop: Vec::new(),
1121 temperature: None,
1122 };
1123
1124 let available_tools = self.available_tools(cx, model.clone());
1125 let available_tool_names = available_tools
1126 .iter()
1127 .map(|tool| tool.name.clone())
1128 .collect();
1129
1130 let model_context = &ModelContext {
1131 available_tools: available_tool_names,
1132 };
1133
1134 if let Some(project_context) = self.project_context.borrow().as_ref() {
1135 match self
1136 .prompt_builder
1137 .generate_assistant_system_prompt(project_context, model_context)
1138 {
1139 Err(err) => {
1140 let message = format!("{err:?}").into();
1141 log::error!("{message}");
1142 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1143 header: "Error generating system prompt".into(),
1144 message,
1145 }));
1146 }
1147 Ok(system_prompt) => {
1148 request.messages.push(LanguageModelRequestMessage {
1149 role: Role::System,
1150 content: vec![MessageContent::Text(system_prompt)],
1151 cache: true,
1152 });
1153 }
1154 }
1155 } else {
1156 let message = "Context for system prompt unexpectedly not ready.".into();
1157 log::error!("{message}");
1158 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1159 header: "Error generating system prompt".into(),
1160 message,
1161 }));
1162 }
1163
1164 for message in &self.messages {
1165 let mut request_message = LanguageModelRequestMessage {
1166 role: message.role,
1167 content: Vec::new(),
1168 cache: false,
1169 };
1170
1171 message
1172 .loaded_context
1173 .add_to_request_message(&mut request_message);
1174
1175 for segment in &message.segments {
1176 match segment {
1177 MessageSegment::Text(text) => {
1178 if !text.is_empty() {
1179 request_message
1180 .content
1181 .push(MessageContent::Text(text.into()));
1182 }
1183 }
1184 MessageSegment::Thinking { text, signature } => {
1185 if !text.is_empty() {
1186 request_message.content.push(MessageContent::Thinking {
1187 text: text.into(),
1188 signature: signature.clone(),
1189 });
1190 }
1191 }
1192 MessageSegment::RedactedThinking(data) => {
1193 request_message
1194 .content
1195 .push(MessageContent::RedactedThinking(data.clone()));
1196 }
1197 };
1198 }
1199
1200 self.tool_use
1201 .attach_tool_uses(message.id, &mut request_message);
1202
1203 request.messages.push(request_message);
1204
1205 if let Some(tool_results_message) = self.tool_use.tool_results_message(message.id) {
1206 request.messages.push(tool_results_message);
1207 }
1208 }
1209
1210 // https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
1211 if let Some(last) = request.messages.last_mut() {
1212 last.cache = true;
1213 }
1214
1215 self.attached_tracked_files_state(&mut request.messages, cx);
1216
1217 request.tools = available_tools;
1218 request.mode = if model.supports_max_mode() {
1219 Some(self.completion_mode)
1220 } else {
1221 Some(CompletionMode::Normal)
1222 };
1223
1224 request
1225 }
1226
1227 fn to_summarize_request(&self, added_user_message: String) -> LanguageModelRequest {
1228 let mut request = LanguageModelRequest {
1229 thread_id: None,
1230 prompt_id: None,
1231 mode: None,
1232 messages: vec![],
1233 tools: Vec::new(),
1234 stop: Vec::new(),
1235 temperature: None,
1236 };
1237
1238 for message in &self.messages {
1239 let mut request_message = LanguageModelRequestMessage {
1240 role: message.role,
1241 content: Vec::new(),
1242 cache: false,
1243 };
1244
1245 for segment in &message.segments {
1246 match segment {
1247 MessageSegment::Text(text) => request_message
1248 .content
1249 .push(MessageContent::Text(text.clone())),
1250 MessageSegment::Thinking { .. } => {}
1251 MessageSegment::RedactedThinking(_) => {}
1252 }
1253 }
1254
1255 if request_message.content.is_empty() {
1256 continue;
1257 }
1258
1259 request.messages.push(request_message);
1260 }
1261
1262 request.messages.push(LanguageModelRequestMessage {
1263 role: Role::User,
1264 content: vec![MessageContent::Text(added_user_message)],
1265 cache: false,
1266 });
1267
1268 request
1269 }
1270
1271 fn attached_tracked_files_state(
1272 &self,
1273 messages: &mut Vec<LanguageModelRequestMessage>,
1274 cx: &App,
1275 ) {
1276 const STALE_FILES_HEADER: &str = "These files changed since last read:";
1277
1278 let mut stale_message = String::new();
1279
1280 let action_log = self.action_log.read(cx);
1281
1282 for stale_file in action_log.stale_buffers(cx) {
1283 let Some(file) = stale_file.read(cx).file() else {
1284 continue;
1285 };
1286
1287 if stale_message.is_empty() {
1288 write!(&mut stale_message, "{}\n", STALE_FILES_HEADER).ok();
1289 }
1290
1291 writeln!(&mut stale_message, "- {}", file.path().display()).ok();
1292 }
1293
1294 let mut content = Vec::with_capacity(2);
1295
1296 if !stale_message.is_empty() {
1297 content.push(stale_message.into());
1298 }
1299
1300 if !content.is_empty() {
1301 let context_message = LanguageModelRequestMessage {
1302 role: Role::User,
1303 content,
1304 cache: false,
1305 };
1306
1307 messages.push(context_message);
1308 }
1309 }
1310
1311 pub fn stream_completion(
1312 &mut self,
1313 request: LanguageModelRequest,
1314 model: Arc<dyn LanguageModel>,
1315 window: Option<AnyWindowHandle>,
1316 cx: &mut Context<Self>,
1317 ) {
1318 let pending_completion_id = post_inc(&mut self.completion_count);
1319 let mut request_callback_parameters = if self.request_callback.is_some() {
1320 Some((request.clone(), Vec::new()))
1321 } else {
1322 None
1323 };
1324 let prompt_id = self.last_prompt_id.clone();
1325 let tool_use_metadata = ToolUseMetadata {
1326 model: model.clone(),
1327 thread_id: self.id.clone(),
1328 prompt_id: prompt_id.clone(),
1329 };
1330
1331 let task = cx.spawn(async move |thread, cx| {
1332 let stream_completion_future = model.stream_completion_with_usage(request, &cx);
1333 let initial_token_usage =
1334 thread.read_with(cx, |thread, _cx| thread.cumulative_token_usage);
1335 let stream_completion = async {
1336 let (mut events, usage) = stream_completion_future.await?;
1337
1338 let mut stop_reason = StopReason::EndTurn;
1339 let mut current_token_usage = TokenUsage::default();
1340
1341 thread
1342 .update(cx, |_thread, cx| {
1343 if let Some(usage) = usage {
1344 cx.emit(ThreadEvent::UsageUpdated(usage));
1345 }
1346 cx.emit(ThreadEvent::NewRequest);
1347 })
1348 .ok();
1349
1350 let mut request_assistant_message_id = None;
1351
1352 while let Some(event) = events.next().await {
1353 if let Some((_, response_events)) = request_callback_parameters.as_mut() {
1354 response_events
1355 .push(event.as_ref().map_err(|error| error.to_string()).cloned());
1356 }
1357
1358 thread.update(cx, |thread, cx| {
1359 let event = match event {
1360 Ok(event) => event,
1361 Err(LanguageModelCompletionError::BadInputJson {
1362 id,
1363 tool_name,
1364 raw_input: invalid_input_json,
1365 json_parse_error,
1366 }) => {
1367 thread.receive_invalid_tool_json(
1368 id,
1369 tool_name,
1370 invalid_input_json,
1371 json_parse_error,
1372 window,
1373 cx,
1374 );
1375 return Ok(());
1376 }
1377 Err(LanguageModelCompletionError::Other(error)) => {
1378 return Err(error);
1379 }
1380 };
1381
1382 match event {
1383 LanguageModelCompletionEvent::StartMessage { .. } => {
1384 request_assistant_message_id =
1385 Some(thread.insert_assistant_message(
1386 vec![MessageSegment::Text(String::new())],
1387 cx,
1388 ));
1389 }
1390 LanguageModelCompletionEvent::Stop(reason) => {
1391 stop_reason = reason;
1392 }
1393 LanguageModelCompletionEvent::UsageUpdate(token_usage) => {
1394 thread.update_token_usage_at_last_message(token_usage);
1395 thread.cumulative_token_usage = thread.cumulative_token_usage
1396 + token_usage
1397 - current_token_usage;
1398 current_token_usage = token_usage;
1399 }
1400 LanguageModelCompletionEvent::Text(chunk) => {
1401 cx.emit(ThreadEvent::ReceivedTextChunk);
1402 if let Some(last_message) = thread.messages.last_mut() {
1403 if last_message.role == Role::Assistant
1404 && !thread.tool_use.has_tool_results(last_message.id)
1405 {
1406 last_message.push_text(&chunk);
1407 cx.emit(ThreadEvent::StreamedAssistantText(
1408 last_message.id,
1409 chunk,
1410 ));
1411 } else {
1412 // If we won't have an Assistant message yet, assume this chunk marks the beginning
1413 // of a new Assistant response.
1414 //
1415 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
1416 // will result in duplicating the text of the chunk in the rendered Markdown.
1417 request_assistant_message_id =
1418 Some(thread.insert_assistant_message(
1419 vec![MessageSegment::Text(chunk.to_string())],
1420 cx,
1421 ));
1422 };
1423 }
1424 }
1425 LanguageModelCompletionEvent::Thinking {
1426 text: chunk,
1427 signature,
1428 } => {
1429 if let Some(last_message) = thread.messages.last_mut() {
1430 if last_message.role == Role::Assistant
1431 && !thread.tool_use.has_tool_results(last_message.id)
1432 {
1433 last_message.push_thinking(&chunk, signature);
1434 cx.emit(ThreadEvent::StreamedAssistantThinking(
1435 last_message.id,
1436 chunk,
1437 ));
1438 } else {
1439 // If we won't have an Assistant message yet, assume this chunk marks the beginning
1440 // of a new Assistant response.
1441 //
1442 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
1443 // will result in duplicating the text of the chunk in the rendered Markdown.
1444 request_assistant_message_id =
1445 Some(thread.insert_assistant_message(
1446 vec![MessageSegment::Thinking {
1447 text: chunk.to_string(),
1448 signature,
1449 }],
1450 cx,
1451 ));
1452 };
1453 }
1454 }
1455 LanguageModelCompletionEvent::ToolUse(tool_use) => {
1456 let last_assistant_message_id = request_assistant_message_id
1457 .unwrap_or_else(|| {
1458 let new_assistant_message_id =
1459 thread.insert_assistant_message(vec![], cx);
1460 request_assistant_message_id =
1461 Some(new_assistant_message_id);
1462 new_assistant_message_id
1463 });
1464
1465 let tool_use_id = tool_use.id.clone();
1466 let streamed_input = if tool_use.is_input_complete {
1467 None
1468 } else {
1469 Some((&tool_use.input).clone())
1470 };
1471
1472 let ui_text = thread.tool_use.request_tool_use(
1473 last_assistant_message_id,
1474 tool_use,
1475 tool_use_metadata.clone(),
1476 cx,
1477 );
1478
1479 if let Some(input) = streamed_input {
1480 cx.emit(ThreadEvent::StreamedToolUse {
1481 tool_use_id,
1482 ui_text,
1483 input,
1484 });
1485 }
1486 }
1487 LanguageModelCompletionEvent::QueueUpdate(queue_event) => {
1488 if let Some(completion) = thread
1489 .pending_completions
1490 .iter_mut()
1491 .find(|completion| completion.id == pending_completion_id)
1492 {
1493 completion.queue_state = match queue_event {
1494 language_model::QueueState::Queued { position } => {
1495 QueueState::Queued { position }
1496 }
1497 language_model::QueueState::Started => QueueState::Started,
1498 }
1499 }
1500 }
1501 }
1502
1503 thread.touch_updated_at();
1504 cx.emit(ThreadEvent::StreamedCompletion);
1505 cx.notify();
1506
1507 thread.auto_capture_telemetry(cx);
1508 Ok(())
1509 })??;
1510
1511 smol::future::yield_now().await;
1512 }
1513
1514 thread.update(cx, |thread, cx| {
1515 thread
1516 .pending_completions
1517 .retain(|completion| completion.id != pending_completion_id);
1518
1519 // If there is a response without tool use, summarize the message. Otherwise,
1520 // allow two tool uses before summarizing.
1521 if thread.summary.is_none()
1522 && thread.messages.len() >= 2
1523 && (!thread.has_pending_tool_uses() || thread.messages.len() >= 6)
1524 {
1525 thread.summarize(cx);
1526 }
1527 })?;
1528
1529 anyhow::Ok(stop_reason)
1530 };
1531
1532 let result = stream_completion.await;
1533
1534 thread
1535 .update(cx, |thread, cx| {
1536 thread.finalize_pending_checkpoint(cx);
1537 match result.as_ref() {
1538 Ok(stop_reason) => match stop_reason {
1539 StopReason::ToolUse => {
1540 let tool_uses = thread.use_pending_tools(window, cx, model.clone());
1541 cx.emit(ThreadEvent::UsePendingTools { tool_uses });
1542 }
1543 StopReason::EndTurn => {}
1544 StopReason::MaxTokens => {}
1545 },
1546 Err(error) => {
1547 if error.is::<PaymentRequiredError>() {
1548 cx.emit(ThreadEvent::ShowError(ThreadError::PaymentRequired));
1549 } else if error.is::<MaxMonthlySpendReachedError>() {
1550 cx.emit(ThreadEvent::ShowError(
1551 ThreadError::MaxMonthlySpendReached,
1552 ));
1553 } else if let Some(error) =
1554 error.downcast_ref::<ModelRequestLimitReachedError>()
1555 {
1556 cx.emit(ThreadEvent::ShowError(
1557 ThreadError::ModelRequestLimitReached { plan: error.plan },
1558 ));
1559 } else if let Some(known_error) =
1560 error.downcast_ref::<LanguageModelKnownError>()
1561 {
1562 match known_error {
1563 LanguageModelKnownError::ContextWindowLimitExceeded {
1564 tokens,
1565 } => {
1566 thread.exceeded_window_error = Some(ExceededWindowError {
1567 model_id: model.id(),
1568 token_count: *tokens,
1569 });
1570 cx.notify();
1571 }
1572 }
1573 } else {
1574 let error_message = error
1575 .chain()
1576 .map(|err| err.to_string())
1577 .collect::<Vec<_>>()
1578 .join("\n");
1579 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1580 header: "Error interacting with language model".into(),
1581 message: SharedString::from(error_message.clone()),
1582 }));
1583 }
1584
1585 thread.cancel_last_completion(window, cx);
1586 }
1587 }
1588 cx.emit(ThreadEvent::Stopped(result.map_err(Arc::new)));
1589
1590 if let Some((request_callback, (request, response_events))) = thread
1591 .request_callback
1592 .as_mut()
1593 .zip(request_callback_parameters.as_ref())
1594 {
1595 request_callback(request, response_events);
1596 }
1597
1598 thread.auto_capture_telemetry(cx);
1599
1600 if let Ok(initial_usage) = initial_token_usage {
1601 let usage = thread.cumulative_token_usage - initial_usage;
1602
1603 telemetry::event!(
1604 "Assistant Thread Completion",
1605 thread_id = thread.id().to_string(),
1606 prompt_id = prompt_id,
1607 model = model.telemetry_id(),
1608 model_provider = model.provider_id().to_string(),
1609 input_tokens = usage.input_tokens,
1610 output_tokens = usage.output_tokens,
1611 cache_creation_input_tokens = usage.cache_creation_input_tokens,
1612 cache_read_input_tokens = usage.cache_read_input_tokens,
1613 );
1614 }
1615 })
1616 .ok();
1617 });
1618
1619 self.pending_completions.push(PendingCompletion {
1620 id: pending_completion_id,
1621 queue_state: QueueState::Sending,
1622 _task: task,
1623 });
1624 }
1625
1626 pub fn summarize(&mut self, cx: &mut Context<Self>) {
1627 let Some(model) = LanguageModelRegistry::read_global(cx).thread_summary_model() else {
1628 return;
1629 };
1630
1631 if !model.provider.is_authenticated(cx) {
1632 return;
1633 }
1634
1635 let added_user_message = "Generate a concise 3-7 word title for this conversation, omitting punctuation. \
1636 Go straight to the title, without any preamble and prefix like `Here's a concise suggestion:...` or `Title:`. \
1637 If the conversation is about a specific subject, include it in the title. \
1638 Be descriptive. DO NOT speak in the first person.";
1639
1640 let request = self.to_summarize_request(added_user_message.into());
1641
1642 self.pending_summary = cx.spawn(async move |this, cx| {
1643 async move {
1644 let stream = model.model.stream_completion_text_with_usage(request, &cx);
1645 let (mut messages, usage) = stream.await?;
1646
1647 if let Some(usage) = usage {
1648 this.update(cx, |_thread, cx| {
1649 cx.emit(ThreadEvent::UsageUpdated(usage));
1650 })
1651 .ok();
1652 }
1653
1654 let mut new_summary = String::new();
1655 while let Some(message) = messages.stream.next().await {
1656 let text = message?;
1657 let mut lines = text.lines();
1658 new_summary.extend(lines.next());
1659
1660 // Stop if the LLM generated multiple lines.
1661 if lines.next().is_some() {
1662 break;
1663 }
1664 }
1665
1666 this.update(cx, |this, cx| {
1667 if !new_summary.is_empty() {
1668 this.summary = Some(new_summary.into());
1669 }
1670
1671 cx.emit(ThreadEvent::SummaryGenerated);
1672 })?;
1673
1674 anyhow::Ok(())
1675 }
1676 .log_err()
1677 .await
1678 });
1679 }
1680
1681 pub fn start_generating_detailed_summary_if_needed(
1682 &mut self,
1683 thread_store: WeakEntity<ThreadStore>,
1684 cx: &mut Context<Self>,
1685 ) {
1686 let Some(last_message_id) = self.messages.last().map(|message| message.id) else {
1687 return;
1688 };
1689
1690 match &*self.detailed_summary_rx.borrow() {
1691 DetailedSummaryState::Generating { message_id, .. }
1692 | DetailedSummaryState::Generated { message_id, .. }
1693 if *message_id == last_message_id =>
1694 {
1695 // Already up-to-date
1696 return;
1697 }
1698 _ => {}
1699 }
1700
1701 let Some(ConfiguredModel { model, provider }) =
1702 LanguageModelRegistry::read_global(cx).thread_summary_model()
1703 else {
1704 return;
1705 };
1706
1707 if !provider.is_authenticated(cx) {
1708 return;
1709 }
1710
1711 let added_user_message = "Generate a detailed summary of this conversation. Include:\n\
1712 1. A brief overview of what was discussed\n\
1713 2. Key facts or information discovered\n\
1714 3. Outcomes or conclusions reached\n\
1715 4. Any action items or next steps if any\n\
1716 Format it in Markdown with headings and bullet points.";
1717
1718 let request = self.to_summarize_request(added_user_message.into());
1719
1720 *self.detailed_summary_tx.borrow_mut() = DetailedSummaryState::Generating {
1721 message_id: last_message_id,
1722 };
1723
1724 // Replace the detailed summarization task if there is one, cancelling it. It would probably
1725 // be better to allow the old task to complete, but this would require logic for choosing
1726 // which result to prefer (the old task could complete after the new one, resulting in a
1727 // stale summary).
1728 self.detailed_summary_task = cx.spawn(async move |thread, cx| {
1729 let stream = model.stream_completion_text(request, &cx);
1730 let Some(mut messages) = stream.await.log_err() else {
1731 thread
1732 .update(cx, |thread, _cx| {
1733 *thread.detailed_summary_tx.borrow_mut() =
1734 DetailedSummaryState::NotGenerated;
1735 })
1736 .ok()?;
1737 return None;
1738 };
1739
1740 let mut new_detailed_summary = String::new();
1741
1742 while let Some(chunk) = messages.stream.next().await {
1743 if let Some(chunk) = chunk.log_err() {
1744 new_detailed_summary.push_str(&chunk);
1745 }
1746 }
1747
1748 thread
1749 .update(cx, |thread, _cx| {
1750 *thread.detailed_summary_tx.borrow_mut() = DetailedSummaryState::Generated {
1751 text: new_detailed_summary.into(),
1752 message_id: last_message_id,
1753 };
1754 })
1755 .ok()?;
1756
1757 // Save thread so its summary can be reused later
1758 if let Some(thread) = thread.upgrade() {
1759 if let Ok(Ok(save_task)) = cx.update(|cx| {
1760 thread_store
1761 .update(cx, |thread_store, cx| thread_store.save_thread(&thread, cx))
1762 }) {
1763 save_task.await.log_err();
1764 }
1765 }
1766
1767 Some(())
1768 });
1769 }
1770
1771 pub async fn wait_for_detailed_summary_or_text(
1772 this: &Entity<Self>,
1773 cx: &mut AsyncApp,
1774 ) -> Option<SharedString> {
1775 let mut detailed_summary_rx = this
1776 .read_with(cx, |this, _cx| this.detailed_summary_rx.clone())
1777 .ok()?;
1778 loop {
1779 match detailed_summary_rx.recv().await? {
1780 DetailedSummaryState::Generating { .. } => {}
1781 DetailedSummaryState::NotGenerated => {
1782 return this.read_with(cx, |this, _cx| this.text().into()).ok();
1783 }
1784 DetailedSummaryState::Generated { text, .. } => return Some(text),
1785 }
1786 }
1787 }
1788
1789 pub fn latest_detailed_summary_or_text(&self) -> SharedString {
1790 self.detailed_summary_rx
1791 .borrow()
1792 .text()
1793 .unwrap_or_else(|| self.text().into())
1794 }
1795
1796 pub fn is_generating_detailed_summary(&self) -> bool {
1797 matches!(
1798 &*self.detailed_summary_rx.borrow(),
1799 DetailedSummaryState::Generating { .. }
1800 )
1801 }
1802
1803 pub fn use_pending_tools(
1804 &mut self,
1805 window: Option<AnyWindowHandle>,
1806 cx: &mut Context<Self>,
1807 model: Arc<dyn LanguageModel>,
1808 ) -> Vec<PendingToolUse> {
1809 self.auto_capture_telemetry(cx);
1810 let request = self.to_completion_request(model, cx);
1811 let messages = Arc::new(request.messages);
1812 let pending_tool_uses = self
1813 .tool_use
1814 .pending_tool_uses()
1815 .into_iter()
1816 .filter(|tool_use| tool_use.status.is_idle())
1817 .cloned()
1818 .collect::<Vec<_>>();
1819
1820 for tool_use in pending_tool_uses.iter() {
1821 if let Some(tool) = self.tools.read(cx).tool(&tool_use.name, cx) {
1822 if tool.needs_confirmation(&tool_use.input, cx)
1823 && !AssistantSettings::get_global(cx).always_allow_tool_actions
1824 {
1825 self.tool_use.confirm_tool_use(
1826 tool_use.id.clone(),
1827 tool_use.ui_text.clone(),
1828 tool_use.input.clone(),
1829 messages.clone(),
1830 tool,
1831 );
1832 cx.emit(ThreadEvent::ToolConfirmationNeeded);
1833 } else {
1834 self.run_tool(
1835 tool_use.id.clone(),
1836 tool_use.ui_text.clone(),
1837 tool_use.input.clone(),
1838 &messages,
1839 tool,
1840 window,
1841 cx,
1842 );
1843 }
1844 }
1845 }
1846
1847 pending_tool_uses
1848 }
1849
1850 pub fn receive_invalid_tool_json(
1851 &mut self,
1852 tool_use_id: LanguageModelToolUseId,
1853 tool_name: Arc<str>,
1854 invalid_json: Arc<str>,
1855 error: String,
1856 window: Option<AnyWindowHandle>,
1857 cx: &mut Context<Thread>,
1858 ) {
1859 log::error!("The model returned invalid input JSON: {invalid_json}");
1860
1861 let pending_tool_use = self.tool_use.insert_tool_output(
1862 tool_use_id.clone(),
1863 tool_name,
1864 Err(anyhow!("Error parsing input JSON: {error}")),
1865 self.configured_model.as_ref(),
1866 );
1867 let ui_text = if let Some(pending_tool_use) = &pending_tool_use {
1868 pending_tool_use.ui_text.clone()
1869 } else {
1870 log::error!(
1871 "There was no pending tool use for tool use {tool_use_id}, even though it finished (with invalid input JSON)."
1872 );
1873 format!("Unknown tool {}", tool_use_id).into()
1874 };
1875
1876 cx.emit(ThreadEvent::InvalidToolInput {
1877 tool_use_id: tool_use_id.clone(),
1878 ui_text,
1879 invalid_input_json: invalid_json,
1880 });
1881
1882 self.tool_finished(tool_use_id, pending_tool_use, false, window, cx);
1883 }
1884
1885 pub fn run_tool(
1886 &mut self,
1887 tool_use_id: LanguageModelToolUseId,
1888 ui_text: impl Into<SharedString>,
1889 input: serde_json::Value,
1890 messages: &[LanguageModelRequestMessage],
1891 tool: Arc<dyn Tool>,
1892 window: Option<AnyWindowHandle>,
1893 cx: &mut Context<Thread>,
1894 ) {
1895 let task = self.spawn_tool_use(tool_use_id.clone(), messages, input, tool, window, cx);
1896 self.tool_use
1897 .run_pending_tool(tool_use_id, ui_text.into(), task);
1898 }
1899
1900 fn spawn_tool_use(
1901 &mut self,
1902 tool_use_id: LanguageModelToolUseId,
1903 messages: &[LanguageModelRequestMessage],
1904 input: serde_json::Value,
1905 tool: Arc<dyn Tool>,
1906 window: Option<AnyWindowHandle>,
1907 cx: &mut Context<Thread>,
1908 ) -> Task<()> {
1909 let tool_name: Arc<str> = tool.name().into();
1910
1911 let tool_result = if self.tools.read(cx).is_disabled(&tool.source(), &tool_name) {
1912 Task::ready(Err(anyhow!("tool is disabled: {tool_name}"))).into()
1913 } else {
1914 tool.run(
1915 input,
1916 messages,
1917 self.project.clone(),
1918 self.action_log.clone(),
1919 window,
1920 cx,
1921 )
1922 };
1923
1924 // Store the card separately if it exists
1925 if let Some(card) = tool_result.card.clone() {
1926 self.tool_use
1927 .insert_tool_result_card(tool_use_id.clone(), card);
1928 }
1929
1930 cx.spawn({
1931 async move |thread: WeakEntity<Thread>, cx| {
1932 let output = tool_result.output.await;
1933
1934 thread
1935 .update(cx, |thread, cx| {
1936 let pending_tool_use = thread.tool_use.insert_tool_output(
1937 tool_use_id.clone(),
1938 tool_name,
1939 output,
1940 thread.configured_model.as_ref(),
1941 );
1942 thread.tool_finished(tool_use_id, pending_tool_use, false, window, cx);
1943 })
1944 .ok();
1945 }
1946 })
1947 }
1948
1949 fn tool_finished(
1950 &mut self,
1951 tool_use_id: LanguageModelToolUseId,
1952 pending_tool_use: Option<PendingToolUse>,
1953 canceled: bool,
1954 window: Option<AnyWindowHandle>,
1955 cx: &mut Context<Self>,
1956 ) {
1957 if self.all_tools_finished() {
1958 if let Some(ConfiguredModel { model, .. }) = self.configured_model.as_ref() {
1959 if !canceled {
1960 self.send_to_model(model.clone(), window, cx);
1961 }
1962 self.auto_capture_telemetry(cx);
1963 }
1964 }
1965
1966 cx.emit(ThreadEvent::ToolFinished {
1967 tool_use_id,
1968 pending_tool_use,
1969 });
1970 }
1971
1972 /// Cancels the last pending completion, if there are any pending.
1973 ///
1974 /// Returns whether a completion was canceled.
1975 pub fn cancel_last_completion(
1976 &mut self,
1977 window: Option<AnyWindowHandle>,
1978 cx: &mut Context<Self>,
1979 ) -> bool {
1980 let mut canceled = self.pending_completions.pop().is_some();
1981
1982 for pending_tool_use in self.tool_use.cancel_pending() {
1983 canceled = true;
1984 self.tool_finished(
1985 pending_tool_use.id.clone(),
1986 Some(pending_tool_use),
1987 true,
1988 window,
1989 cx,
1990 );
1991 }
1992
1993 self.finalize_pending_checkpoint(cx);
1994
1995 if canceled {
1996 cx.emit(ThreadEvent::CompletionCanceled);
1997 }
1998
1999 canceled
2000 }
2001
2002 /// Signals that any in-progress editing should be canceled.
2003 ///
2004 /// This method is used to notify listeners (like ActiveThread) that
2005 /// they should cancel any editing operations.
2006 pub fn cancel_editing(&mut self, cx: &mut Context<Self>) {
2007 cx.emit(ThreadEvent::CancelEditing);
2008 }
2009
2010 pub fn feedback(&self) -> Option<ThreadFeedback> {
2011 self.feedback
2012 }
2013
2014 pub fn message_feedback(&self, message_id: MessageId) -> Option<ThreadFeedback> {
2015 self.message_feedback.get(&message_id).copied()
2016 }
2017
2018 pub fn report_message_feedback(
2019 &mut self,
2020 message_id: MessageId,
2021 feedback: ThreadFeedback,
2022 cx: &mut Context<Self>,
2023 ) -> Task<Result<()>> {
2024 if self.message_feedback.get(&message_id) == Some(&feedback) {
2025 return Task::ready(Ok(()));
2026 }
2027
2028 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
2029 let serialized_thread = self.serialize(cx);
2030 let thread_id = self.id().clone();
2031 let client = self.project.read(cx).client();
2032
2033 let enabled_tool_names: Vec<String> = self
2034 .tools()
2035 .read(cx)
2036 .enabled_tools(cx)
2037 .iter()
2038 .map(|tool| tool.name().to_string())
2039 .collect();
2040
2041 self.message_feedback.insert(message_id, feedback);
2042
2043 cx.notify();
2044
2045 let message_content = self
2046 .message(message_id)
2047 .map(|msg| msg.to_string())
2048 .unwrap_or_default();
2049
2050 cx.background_spawn(async move {
2051 let final_project_snapshot = final_project_snapshot.await;
2052 let serialized_thread = serialized_thread.await?;
2053 let thread_data =
2054 serde_json::to_value(serialized_thread).unwrap_or_else(|_| serde_json::Value::Null);
2055
2056 let rating = match feedback {
2057 ThreadFeedback::Positive => "positive",
2058 ThreadFeedback::Negative => "negative",
2059 };
2060 telemetry::event!(
2061 "Assistant Thread Rated",
2062 rating,
2063 thread_id,
2064 enabled_tool_names,
2065 message_id = message_id.0,
2066 message_content,
2067 thread_data,
2068 final_project_snapshot
2069 );
2070 client.telemetry().flush_events().await;
2071
2072 Ok(())
2073 })
2074 }
2075
2076 pub fn report_feedback(
2077 &mut self,
2078 feedback: ThreadFeedback,
2079 cx: &mut Context<Self>,
2080 ) -> Task<Result<()>> {
2081 let last_assistant_message_id = self
2082 .messages
2083 .iter()
2084 .rev()
2085 .find(|msg| msg.role == Role::Assistant)
2086 .map(|msg| msg.id);
2087
2088 if let Some(message_id) = last_assistant_message_id {
2089 self.report_message_feedback(message_id, feedback, cx)
2090 } else {
2091 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
2092 let serialized_thread = self.serialize(cx);
2093 let thread_id = self.id().clone();
2094 let client = self.project.read(cx).client();
2095 self.feedback = Some(feedback);
2096 cx.notify();
2097
2098 cx.background_spawn(async move {
2099 let final_project_snapshot = final_project_snapshot.await;
2100 let serialized_thread = serialized_thread.await?;
2101 let thread_data = serde_json::to_value(serialized_thread)
2102 .unwrap_or_else(|_| serde_json::Value::Null);
2103
2104 let rating = match feedback {
2105 ThreadFeedback::Positive => "positive",
2106 ThreadFeedback::Negative => "negative",
2107 };
2108 telemetry::event!(
2109 "Assistant Thread Rated",
2110 rating,
2111 thread_id,
2112 thread_data,
2113 final_project_snapshot
2114 );
2115 client.telemetry().flush_events().await;
2116
2117 Ok(())
2118 })
2119 }
2120 }
2121
2122 /// Create a snapshot of the current project state including git information and unsaved buffers.
2123 fn project_snapshot(
2124 project: Entity<Project>,
2125 cx: &mut Context<Self>,
2126 ) -> Task<Arc<ProjectSnapshot>> {
2127 let git_store = project.read(cx).git_store().clone();
2128 let worktree_snapshots: Vec<_> = project
2129 .read(cx)
2130 .visible_worktrees(cx)
2131 .map(|worktree| Self::worktree_snapshot(worktree, git_store.clone(), cx))
2132 .collect();
2133
2134 cx.spawn(async move |_, cx| {
2135 let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
2136
2137 let mut unsaved_buffers = Vec::new();
2138 cx.update(|app_cx| {
2139 let buffer_store = project.read(app_cx).buffer_store();
2140 for buffer_handle in buffer_store.read(app_cx).buffers() {
2141 let buffer = buffer_handle.read(app_cx);
2142 if buffer.is_dirty() {
2143 if let Some(file) = buffer.file() {
2144 let path = file.path().to_string_lossy().to_string();
2145 unsaved_buffers.push(path);
2146 }
2147 }
2148 }
2149 })
2150 .ok();
2151
2152 Arc::new(ProjectSnapshot {
2153 worktree_snapshots,
2154 unsaved_buffer_paths: unsaved_buffers,
2155 timestamp: Utc::now(),
2156 })
2157 })
2158 }
2159
2160 fn worktree_snapshot(
2161 worktree: Entity<project::Worktree>,
2162 git_store: Entity<GitStore>,
2163 cx: &App,
2164 ) -> Task<WorktreeSnapshot> {
2165 cx.spawn(async move |cx| {
2166 // Get worktree path and snapshot
2167 let worktree_info = cx.update(|app_cx| {
2168 let worktree = worktree.read(app_cx);
2169 let path = worktree.abs_path().to_string_lossy().to_string();
2170 let snapshot = worktree.snapshot();
2171 (path, snapshot)
2172 });
2173
2174 let Ok((worktree_path, _snapshot)) = worktree_info else {
2175 return WorktreeSnapshot {
2176 worktree_path: String::new(),
2177 git_state: None,
2178 };
2179 };
2180
2181 let git_state = git_store
2182 .update(cx, |git_store, cx| {
2183 git_store
2184 .repositories()
2185 .values()
2186 .find(|repo| {
2187 repo.read(cx)
2188 .abs_path_to_repo_path(&worktree.read(cx).abs_path())
2189 .is_some()
2190 })
2191 .cloned()
2192 })
2193 .ok()
2194 .flatten()
2195 .map(|repo| {
2196 repo.update(cx, |repo, _| {
2197 let current_branch =
2198 repo.branch.as_ref().map(|branch| branch.name().to_owned());
2199 repo.send_job(None, |state, _| async move {
2200 let RepositoryState::Local { backend, .. } = state else {
2201 return GitState {
2202 remote_url: None,
2203 head_sha: None,
2204 current_branch,
2205 diff: None,
2206 };
2207 };
2208
2209 let remote_url = backend.remote_url("origin");
2210 let head_sha = backend.head_sha().await;
2211 let diff = backend.diff(DiffType::HeadToWorktree).await.ok();
2212
2213 GitState {
2214 remote_url,
2215 head_sha,
2216 current_branch,
2217 diff,
2218 }
2219 })
2220 })
2221 });
2222
2223 let git_state = match git_state {
2224 Some(git_state) => match git_state.ok() {
2225 Some(git_state) => git_state.await.ok(),
2226 None => None,
2227 },
2228 None => None,
2229 };
2230
2231 WorktreeSnapshot {
2232 worktree_path,
2233 git_state,
2234 }
2235 })
2236 }
2237
2238 pub fn to_markdown(&self, cx: &App) -> Result<String> {
2239 let mut markdown = Vec::new();
2240
2241 if let Some(summary) = self.summary() {
2242 writeln!(markdown, "# {summary}\n")?;
2243 };
2244
2245 for message in self.messages() {
2246 writeln!(
2247 markdown,
2248 "## {role}\n",
2249 role = match message.role {
2250 Role::User => "User",
2251 Role::Assistant => "Assistant",
2252 Role::System => "System",
2253 }
2254 )?;
2255
2256 if !message.loaded_context.text.is_empty() {
2257 writeln!(markdown, "{}", message.loaded_context.text)?;
2258 }
2259
2260 if !message.loaded_context.images.is_empty() {
2261 writeln!(
2262 markdown,
2263 "\n{} images attached as context.\n",
2264 message.loaded_context.images.len()
2265 )?;
2266 }
2267
2268 for segment in &message.segments {
2269 match segment {
2270 MessageSegment::Text(text) => writeln!(markdown, "{}\n", text)?,
2271 MessageSegment::Thinking { text, .. } => {
2272 writeln!(markdown, "<think>\n{}\n</think>\n", text)?
2273 }
2274 MessageSegment::RedactedThinking(_) => {}
2275 }
2276 }
2277
2278 for tool_use in self.tool_uses_for_message(message.id, cx) {
2279 writeln!(
2280 markdown,
2281 "**Use Tool: {} ({})**",
2282 tool_use.name, tool_use.id
2283 )?;
2284 writeln!(markdown, "```json")?;
2285 writeln!(
2286 markdown,
2287 "{}",
2288 serde_json::to_string_pretty(&tool_use.input)?
2289 )?;
2290 writeln!(markdown, "```")?;
2291 }
2292
2293 for tool_result in self.tool_results_for_message(message.id) {
2294 write!(markdown, "\n**Tool Results: {}", tool_result.tool_use_id)?;
2295 if tool_result.is_error {
2296 write!(markdown, " (Error)")?;
2297 }
2298
2299 writeln!(markdown, "**\n")?;
2300 writeln!(markdown, "{}", tool_result.content)?;
2301 }
2302 }
2303
2304 Ok(String::from_utf8_lossy(&markdown).to_string())
2305 }
2306
2307 pub fn keep_edits_in_range(
2308 &mut self,
2309 buffer: Entity<language::Buffer>,
2310 buffer_range: Range<language::Anchor>,
2311 cx: &mut Context<Self>,
2312 ) {
2313 self.action_log.update(cx, |action_log, cx| {
2314 action_log.keep_edits_in_range(buffer, buffer_range, cx)
2315 });
2316 }
2317
2318 pub fn keep_all_edits(&mut self, cx: &mut Context<Self>) {
2319 self.action_log
2320 .update(cx, |action_log, cx| action_log.keep_all_edits(cx));
2321 }
2322
2323 pub fn reject_edits_in_ranges(
2324 &mut self,
2325 buffer: Entity<language::Buffer>,
2326 buffer_ranges: Vec<Range<language::Anchor>>,
2327 cx: &mut Context<Self>,
2328 ) -> Task<Result<()>> {
2329 self.action_log.update(cx, |action_log, cx| {
2330 action_log.reject_edits_in_ranges(buffer, buffer_ranges, cx)
2331 })
2332 }
2333
2334 pub fn action_log(&self) -> &Entity<ActionLog> {
2335 &self.action_log
2336 }
2337
2338 pub fn project(&self) -> &Entity<Project> {
2339 &self.project
2340 }
2341
2342 pub fn auto_capture_telemetry(&mut self, cx: &mut Context<Self>) {
2343 if !cx.has_flag::<feature_flags::ThreadAutoCaptureFeatureFlag>() {
2344 return;
2345 }
2346
2347 let now = Instant::now();
2348 if let Some(last) = self.last_auto_capture_at {
2349 if now.duration_since(last).as_secs() < 10 {
2350 return;
2351 }
2352 }
2353
2354 self.last_auto_capture_at = Some(now);
2355
2356 let thread_id = self.id().clone();
2357 let github_login = self
2358 .project
2359 .read(cx)
2360 .user_store()
2361 .read(cx)
2362 .current_user()
2363 .map(|user| user.github_login.clone());
2364 let client = self.project.read(cx).client().clone();
2365 let serialize_task = self.serialize(cx);
2366
2367 cx.background_executor()
2368 .spawn(async move {
2369 if let Ok(serialized_thread) = serialize_task.await {
2370 if let Ok(thread_data) = serde_json::to_value(serialized_thread) {
2371 telemetry::event!(
2372 "Agent Thread Auto-Captured",
2373 thread_id = thread_id.to_string(),
2374 thread_data = thread_data,
2375 auto_capture_reason = "tracked_user",
2376 github_login = github_login
2377 );
2378
2379 client.telemetry().flush_events().await;
2380 }
2381 }
2382 })
2383 .detach();
2384 }
2385
2386 pub fn cumulative_token_usage(&self) -> TokenUsage {
2387 self.cumulative_token_usage
2388 }
2389
2390 pub fn token_usage_up_to_message(&self, message_id: MessageId) -> TotalTokenUsage {
2391 let Some(model) = self.configured_model.as_ref() else {
2392 return TotalTokenUsage::default();
2393 };
2394
2395 let max = model.model.max_token_count();
2396
2397 let index = self
2398 .messages
2399 .iter()
2400 .position(|msg| msg.id == message_id)
2401 .unwrap_or(0);
2402
2403 if index == 0 {
2404 return TotalTokenUsage { total: 0, max };
2405 }
2406
2407 let token_usage = &self
2408 .request_token_usage
2409 .get(index - 1)
2410 .cloned()
2411 .unwrap_or_default();
2412
2413 TotalTokenUsage {
2414 total: token_usage.total_tokens() as usize,
2415 max,
2416 }
2417 }
2418
2419 pub fn total_token_usage(&self) -> Option<TotalTokenUsage> {
2420 let model = self.configured_model.as_ref()?;
2421
2422 let max = model.model.max_token_count();
2423
2424 if let Some(exceeded_error) = &self.exceeded_window_error {
2425 if model.model.id() == exceeded_error.model_id {
2426 return Some(TotalTokenUsage {
2427 total: exceeded_error.token_count,
2428 max,
2429 });
2430 }
2431 }
2432
2433 let total = self
2434 .token_usage_at_last_message()
2435 .unwrap_or_default()
2436 .total_tokens() as usize;
2437
2438 Some(TotalTokenUsage { total, max })
2439 }
2440
2441 fn token_usage_at_last_message(&self) -> Option<TokenUsage> {
2442 self.request_token_usage
2443 .get(self.messages.len().saturating_sub(1))
2444 .or_else(|| self.request_token_usage.last())
2445 .cloned()
2446 }
2447
2448 fn update_token_usage_at_last_message(&mut self, token_usage: TokenUsage) {
2449 let placeholder = self.token_usage_at_last_message().unwrap_or_default();
2450 self.request_token_usage
2451 .resize(self.messages.len(), placeholder);
2452
2453 if let Some(last) = self.request_token_usage.last_mut() {
2454 *last = token_usage;
2455 }
2456 }
2457
2458 pub fn deny_tool_use(
2459 &mut self,
2460 tool_use_id: LanguageModelToolUseId,
2461 tool_name: Arc<str>,
2462 window: Option<AnyWindowHandle>,
2463 cx: &mut Context<Self>,
2464 ) {
2465 let err = Err(anyhow::anyhow!(
2466 "Permission to run tool action denied by user"
2467 ));
2468
2469 self.tool_use.insert_tool_output(
2470 tool_use_id.clone(),
2471 tool_name,
2472 err,
2473 self.configured_model.as_ref(),
2474 );
2475 self.tool_finished(tool_use_id.clone(), None, true, window, cx);
2476 }
2477}
2478
2479#[derive(Debug, Clone, Error)]
2480pub enum ThreadError {
2481 #[error("Payment required")]
2482 PaymentRequired,
2483 #[error("Max monthly spend reached")]
2484 MaxMonthlySpendReached,
2485 #[error("Model request limit reached")]
2486 ModelRequestLimitReached { plan: Plan },
2487 #[error("Message {header}: {message}")]
2488 Message {
2489 header: SharedString,
2490 message: SharedString,
2491 },
2492}
2493
2494#[derive(Debug, Clone)]
2495pub enum ThreadEvent {
2496 ShowError(ThreadError),
2497 UsageUpdated(RequestUsage),
2498 StreamedCompletion,
2499 ReceivedTextChunk,
2500 NewRequest,
2501 StreamedAssistantText(MessageId, String),
2502 StreamedAssistantThinking(MessageId, String),
2503 StreamedToolUse {
2504 tool_use_id: LanguageModelToolUseId,
2505 ui_text: Arc<str>,
2506 input: serde_json::Value,
2507 },
2508 InvalidToolInput {
2509 tool_use_id: LanguageModelToolUseId,
2510 ui_text: Arc<str>,
2511 invalid_input_json: Arc<str>,
2512 },
2513 Stopped(Result<StopReason, Arc<anyhow::Error>>),
2514 MessageAdded(MessageId),
2515 MessageEdited(MessageId),
2516 MessageDeleted(MessageId),
2517 SummaryGenerated,
2518 SummaryChanged,
2519 UsePendingTools {
2520 tool_uses: Vec<PendingToolUse>,
2521 },
2522 ToolFinished {
2523 #[allow(unused)]
2524 tool_use_id: LanguageModelToolUseId,
2525 /// The pending tool use that corresponds to this tool.
2526 pending_tool_use: Option<PendingToolUse>,
2527 },
2528 CheckpointChanged,
2529 ToolConfirmationNeeded,
2530 CancelEditing,
2531 CompletionCanceled,
2532}
2533
2534impl EventEmitter<ThreadEvent> for Thread {}
2535
2536struct PendingCompletion {
2537 id: usize,
2538 queue_state: QueueState,
2539 _task: Task<()>,
2540}
2541
2542#[cfg(test)]
2543mod tests {
2544 use super::*;
2545 use crate::{ThreadStore, context::load_context, context_store::ContextStore, thread_store};
2546 use assistant_settings::AssistantSettings;
2547 use assistant_tool::ToolRegistry;
2548 use context_server::ContextServerSettings;
2549 use editor::EditorSettings;
2550 use gpui::TestAppContext;
2551 use language_model::fake_provider::FakeLanguageModel;
2552 use project::{FakeFs, Project};
2553 use prompt_store::PromptBuilder;
2554 use serde_json::json;
2555 use settings::{Settings, SettingsStore};
2556 use std::sync::Arc;
2557 use theme::ThemeSettings;
2558 use util::path;
2559 use workspace::Workspace;
2560
2561 #[gpui::test]
2562 async fn test_message_with_context(cx: &mut TestAppContext) {
2563 init_test_settings(cx);
2564
2565 let project = create_test_project(
2566 cx,
2567 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2568 )
2569 .await;
2570
2571 let (_workspace, _thread_store, thread, context_store, model) =
2572 setup_test_environment(cx, project.clone()).await;
2573
2574 add_file_to_context(&project, &context_store, "test/code.rs", cx)
2575 .await
2576 .unwrap();
2577
2578 let context = context_store.update(cx, |store, _| store.context().next().cloned().unwrap());
2579 let loaded_context = cx
2580 .update(|cx| load_context(vec![context], &project, &None, cx))
2581 .await;
2582
2583 // Insert user message with context
2584 let message_id = thread.update(cx, |thread, cx| {
2585 thread.insert_user_message(
2586 "Please explain this code",
2587 loaded_context,
2588 None,
2589 Vec::new(),
2590 cx,
2591 )
2592 });
2593
2594 // Check content and context in message object
2595 let message = thread.read_with(cx, |thread, _| thread.message(message_id).unwrap().clone());
2596
2597 // Use different path format strings based on platform for the test
2598 #[cfg(windows)]
2599 let path_part = r"test\code.rs";
2600 #[cfg(not(windows))]
2601 let path_part = "test/code.rs";
2602
2603 let expected_context = format!(
2604 r#"
2605<context>
2606The following items were attached by the user. They are up-to-date and don't need to be re-read.
2607
2608<files>
2609```rs {path_part}
2610fn main() {{
2611 println!("Hello, world!");
2612}}
2613```
2614</files>
2615</context>
2616"#
2617 );
2618
2619 assert_eq!(message.role, Role::User);
2620 assert_eq!(message.segments.len(), 1);
2621 assert_eq!(
2622 message.segments[0],
2623 MessageSegment::Text("Please explain this code".to_string())
2624 );
2625 assert_eq!(message.loaded_context.text, expected_context);
2626
2627 // Check message in request
2628 let request = thread.update(cx, |thread, cx| {
2629 thread.to_completion_request(model.clone(), cx)
2630 });
2631
2632 assert_eq!(request.messages.len(), 2);
2633 let expected_full_message = format!("{}Please explain this code", expected_context);
2634 assert_eq!(request.messages[1].string_contents(), expected_full_message);
2635 }
2636
2637 #[gpui::test]
2638 async fn test_only_include_new_contexts(cx: &mut TestAppContext) {
2639 init_test_settings(cx);
2640
2641 let project = create_test_project(
2642 cx,
2643 json!({
2644 "file1.rs": "fn function1() {}\n",
2645 "file2.rs": "fn function2() {}\n",
2646 "file3.rs": "fn function3() {}\n",
2647 "file4.rs": "fn function4() {}\n",
2648 }),
2649 )
2650 .await;
2651
2652 let (_, _thread_store, thread, context_store, model) =
2653 setup_test_environment(cx, project.clone()).await;
2654
2655 // First message with context 1
2656 add_file_to_context(&project, &context_store, "test/file1.rs", cx)
2657 .await
2658 .unwrap();
2659 let new_contexts = context_store.update(cx, |store, cx| {
2660 store.new_context_for_thread(thread.read(cx), None)
2661 });
2662 assert_eq!(new_contexts.len(), 1);
2663 let loaded_context = cx
2664 .update(|cx| load_context(new_contexts, &project, &None, cx))
2665 .await;
2666 let message1_id = thread.update(cx, |thread, cx| {
2667 thread.insert_user_message("Message 1", loaded_context, None, Vec::new(), cx)
2668 });
2669
2670 // Second message with contexts 1 and 2 (context 1 should be skipped as it's already included)
2671 add_file_to_context(&project, &context_store, "test/file2.rs", cx)
2672 .await
2673 .unwrap();
2674 let new_contexts = context_store.update(cx, |store, cx| {
2675 store.new_context_for_thread(thread.read(cx), None)
2676 });
2677 assert_eq!(new_contexts.len(), 1);
2678 let loaded_context = cx
2679 .update(|cx| load_context(new_contexts, &project, &None, cx))
2680 .await;
2681 let message2_id = thread.update(cx, |thread, cx| {
2682 thread.insert_user_message("Message 2", loaded_context, None, Vec::new(), cx)
2683 });
2684
2685 // Third message with all three contexts (contexts 1 and 2 should be skipped)
2686 //
2687 add_file_to_context(&project, &context_store, "test/file3.rs", cx)
2688 .await
2689 .unwrap();
2690 let new_contexts = context_store.update(cx, |store, cx| {
2691 store.new_context_for_thread(thread.read(cx), None)
2692 });
2693 assert_eq!(new_contexts.len(), 1);
2694 let loaded_context = cx
2695 .update(|cx| load_context(new_contexts, &project, &None, cx))
2696 .await;
2697 let message3_id = thread.update(cx, |thread, cx| {
2698 thread.insert_user_message("Message 3", loaded_context, None, Vec::new(), cx)
2699 });
2700
2701 // Check what contexts are included in each message
2702 let (message1, message2, message3) = thread.read_with(cx, |thread, _| {
2703 (
2704 thread.message(message1_id).unwrap().clone(),
2705 thread.message(message2_id).unwrap().clone(),
2706 thread.message(message3_id).unwrap().clone(),
2707 )
2708 });
2709
2710 // First message should include context 1
2711 assert!(message1.loaded_context.text.contains("file1.rs"));
2712
2713 // Second message should include only context 2 (not 1)
2714 assert!(!message2.loaded_context.text.contains("file1.rs"));
2715 assert!(message2.loaded_context.text.contains("file2.rs"));
2716
2717 // Third message should include only context 3 (not 1 or 2)
2718 assert!(!message3.loaded_context.text.contains("file1.rs"));
2719 assert!(!message3.loaded_context.text.contains("file2.rs"));
2720 assert!(message3.loaded_context.text.contains("file3.rs"));
2721
2722 // Check entire request to make sure all contexts are properly included
2723 let request = thread.update(cx, |thread, cx| {
2724 thread.to_completion_request(model.clone(), cx)
2725 });
2726
2727 // The request should contain all 3 messages
2728 assert_eq!(request.messages.len(), 4);
2729
2730 // Check that the contexts are properly formatted in each message
2731 assert!(request.messages[1].string_contents().contains("file1.rs"));
2732 assert!(!request.messages[1].string_contents().contains("file2.rs"));
2733 assert!(!request.messages[1].string_contents().contains("file3.rs"));
2734
2735 assert!(!request.messages[2].string_contents().contains("file1.rs"));
2736 assert!(request.messages[2].string_contents().contains("file2.rs"));
2737 assert!(!request.messages[2].string_contents().contains("file3.rs"));
2738
2739 assert!(!request.messages[3].string_contents().contains("file1.rs"));
2740 assert!(!request.messages[3].string_contents().contains("file2.rs"));
2741 assert!(request.messages[3].string_contents().contains("file3.rs"));
2742
2743 add_file_to_context(&project, &context_store, "test/file4.rs", cx)
2744 .await
2745 .unwrap();
2746 let new_contexts = context_store.update(cx, |store, cx| {
2747 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2748 });
2749 assert_eq!(new_contexts.len(), 3);
2750 let loaded_context = cx
2751 .update(|cx| load_context(new_contexts, &project, &None, cx))
2752 .await
2753 .loaded_context;
2754
2755 assert!(!loaded_context.text.contains("file1.rs"));
2756 assert!(loaded_context.text.contains("file2.rs"));
2757 assert!(loaded_context.text.contains("file3.rs"));
2758 assert!(loaded_context.text.contains("file4.rs"));
2759
2760 let new_contexts = context_store.update(cx, |store, cx| {
2761 // Remove file4.rs
2762 store.remove_context(&loaded_context.contexts[2].handle(), cx);
2763 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2764 });
2765 assert_eq!(new_contexts.len(), 2);
2766 let loaded_context = cx
2767 .update(|cx| load_context(new_contexts, &project, &None, cx))
2768 .await
2769 .loaded_context;
2770
2771 assert!(!loaded_context.text.contains("file1.rs"));
2772 assert!(loaded_context.text.contains("file2.rs"));
2773 assert!(loaded_context.text.contains("file3.rs"));
2774 assert!(!loaded_context.text.contains("file4.rs"));
2775
2776 let new_contexts = context_store.update(cx, |store, cx| {
2777 // Remove file3.rs
2778 store.remove_context(&loaded_context.contexts[1].handle(), cx);
2779 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2780 });
2781 assert_eq!(new_contexts.len(), 1);
2782 let loaded_context = cx
2783 .update(|cx| load_context(new_contexts, &project, &None, cx))
2784 .await
2785 .loaded_context;
2786
2787 assert!(!loaded_context.text.contains("file1.rs"));
2788 assert!(loaded_context.text.contains("file2.rs"));
2789 assert!(!loaded_context.text.contains("file3.rs"));
2790 assert!(!loaded_context.text.contains("file4.rs"));
2791 }
2792
2793 #[gpui::test]
2794 async fn test_message_without_files(cx: &mut TestAppContext) {
2795 init_test_settings(cx);
2796
2797 let project = create_test_project(
2798 cx,
2799 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2800 )
2801 .await;
2802
2803 let (_, _thread_store, thread, _context_store, model) =
2804 setup_test_environment(cx, project.clone()).await;
2805
2806 // Insert user message without any context (empty context vector)
2807 let message_id = thread.update(cx, |thread, cx| {
2808 thread.insert_user_message(
2809 "What is the best way to learn Rust?",
2810 ContextLoadResult::default(),
2811 None,
2812 Vec::new(),
2813 cx,
2814 )
2815 });
2816
2817 // Check content and context in message object
2818 let message = thread.read_with(cx, |thread, _| thread.message(message_id).unwrap().clone());
2819
2820 // Context should be empty when no files are included
2821 assert_eq!(message.role, Role::User);
2822 assert_eq!(message.segments.len(), 1);
2823 assert_eq!(
2824 message.segments[0],
2825 MessageSegment::Text("What is the best way to learn Rust?".to_string())
2826 );
2827 assert_eq!(message.loaded_context.text, "");
2828
2829 // Check message in request
2830 let request = thread.update(cx, |thread, cx| {
2831 thread.to_completion_request(model.clone(), cx)
2832 });
2833
2834 assert_eq!(request.messages.len(), 2);
2835 assert_eq!(
2836 request.messages[1].string_contents(),
2837 "What is the best way to learn Rust?"
2838 );
2839
2840 // Add second message, also without context
2841 let message2_id = thread.update(cx, |thread, cx| {
2842 thread.insert_user_message(
2843 "Are there any good books?",
2844 ContextLoadResult::default(),
2845 None,
2846 Vec::new(),
2847 cx,
2848 )
2849 });
2850
2851 let message2 =
2852 thread.read_with(cx, |thread, _| thread.message(message2_id).unwrap().clone());
2853 assert_eq!(message2.loaded_context.text, "");
2854
2855 // Check that both messages appear in the request
2856 let request = thread.update(cx, |thread, cx| {
2857 thread.to_completion_request(model.clone(), cx)
2858 });
2859
2860 assert_eq!(request.messages.len(), 3);
2861 assert_eq!(
2862 request.messages[1].string_contents(),
2863 "What is the best way to learn Rust?"
2864 );
2865 assert_eq!(
2866 request.messages[2].string_contents(),
2867 "Are there any good books?"
2868 );
2869 }
2870
2871 #[gpui::test]
2872 async fn test_stale_buffer_notification(cx: &mut TestAppContext) {
2873 init_test_settings(cx);
2874
2875 let project = create_test_project(
2876 cx,
2877 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2878 )
2879 .await;
2880
2881 let (_workspace, _thread_store, thread, context_store, model) =
2882 setup_test_environment(cx, project.clone()).await;
2883
2884 // Open buffer and add it to context
2885 let buffer = add_file_to_context(&project, &context_store, "test/code.rs", cx)
2886 .await
2887 .unwrap();
2888
2889 let context = context_store.update(cx, |store, _| store.context().next().cloned().unwrap());
2890 let loaded_context = cx
2891 .update(|cx| load_context(vec![context], &project, &None, cx))
2892 .await;
2893
2894 // Insert user message with the buffer as context
2895 thread.update(cx, |thread, cx| {
2896 thread.insert_user_message("Explain this code", loaded_context, None, Vec::new(), cx)
2897 });
2898
2899 // Create a request and check that it doesn't have a stale buffer warning yet
2900 let initial_request = thread.update(cx, |thread, cx| {
2901 thread.to_completion_request(model.clone(), cx)
2902 });
2903
2904 // Make sure we don't have a stale file warning yet
2905 let has_stale_warning = initial_request.messages.iter().any(|msg| {
2906 msg.string_contents()
2907 .contains("These files changed since last read:")
2908 });
2909 assert!(
2910 !has_stale_warning,
2911 "Should not have stale buffer warning before buffer is modified"
2912 );
2913
2914 // Modify the buffer
2915 buffer.update(cx, |buffer, cx| {
2916 // Find a position at the end of line 1
2917 buffer.edit(
2918 [(1..1, "\n println!(\"Added a new line\");\n")],
2919 None,
2920 cx,
2921 );
2922 });
2923
2924 // Insert another user message without context
2925 thread.update(cx, |thread, cx| {
2926 thread.insert_user_message(
2927 "What does the code do now?",
2928 ContextLoadResult::default(),
2929 None,
2930 Vec::new(),
2931 cx,
2932 )
2933 });
2934
2935 // Create a new request and check for the stale buffer warning
2936 let new_request = thread.update(cx, |thread, cx| {
2937 thread.to_completion_request(model.clone(), cx)
2938 });
2939
2940 // We should have a stale file warning as the last message
2941 let last_message = new_request
2942 .messages
2943 .last()
2944 .expect("Request should have messages");
2945
2946 // The last message should be the stale buffer notification
2947 assert_eq!(last_message.role, Role::User);
2948
2949 // Check the exact content of the message
2950 let expected_content = "These files changed since last read:\n- code.rs\n";
2951 assert_eq!(
2952 last_message.string_contents(),
2953 expected_content,
2954 "Last message should be exactly the stale buffer notification"
2955 );
2956 }
2957
2958 fn init_test_settings(cx: &mut TestAppContext) {
2959 cx.update(|cx| {
2960 let settings_store = SettingsStore::test(cx);
2961 cx.set_global(settings_store);
2962 language::init(cx);
2963 Project::init_settings(cx);
2964 AssistantSettings::register(cx);
2965 prompt_store::init(cx);
2966 thread_store::init(cx);
2967 workspace::init_settings(cx);
2968 language_model::init_settings(cx);
2969 ThemeSettings::register(cx);
2970 ContextServerSettings::register(cx);
2971 EditorSettings::register(cx);
2972 ToolRegistry::default_global(cx);
2973 });
2974 }
2975
2976 // Helper to create a test project with test files
2977 async fn create_test_project(
2978 cx: &mut TestAppContext,
2979 files: serde_json::Value,
2980 ) -> Entity<Project> {
2981 let fs = FakeFs::new(cx.executor());
2982 fs.insert_tree(path!("/test"), files).await;
2983 Project::test(fs, [path!("/test").as_ref()], cx).await
2984 }
2985
2986 async fn setup_test_environment(
2987 cx: &mut TestAppContext,
2988 project: Entity<Project>,
2989 ) -> (
2990 Entity<Workspace>,
2991 Entity<ThreadStore>,
2992 Entity<Thread>,
2993 Entity<ContextStore>,
2994 Arc<dyn LanguageModel>,
2995 ) {
2996 let (workspace, cx) =
2997 cx.add_window_view(|window, cx| Workspace::test_new(project.clone(), window, cx));
2998
2999 let thread_store = cx
3000 .update(|_, cx| {
3001 ThreadStore::load(
3002 project.clone(),
3003 cx.new(|_| ToolWorkingSet::default()),
3004 None,
3005 Arc::new(PromptBuilder::new(None).unwrap()),
3006 cx,
3007 )
3008 })
3009 .await
3010 .unwrap();
3011
3012 let thread = thread_store.update(cx, |store, cx| store.create_thread(cx));
3013 let context_store = cx.new(|_cx| ContextStore::new(project.downgrade(), None));
3014
3015 let model = FakeLanguageModel::default();
3016 let model: Arc<dyn LanguageModel> = Arc::new(model);
3017
3018 (workspace, thread_store, thread, context_store, model)
3019 }
3020
3021 async fn add_file_to_context(
3022 project: &Entity<Project>,
3023 context_store: &Entity<ContextStore>,
3024 path: &str,
3025 cx: &mut TestAppContext,
3026 ) -> Result<Entity<language::Buffer>> {
3027 let buffer_path = project
3028 .read_with(cx, |project, cx| project.find_project_path(path, cx))
3029 .unwrap();
3030
3031 let buffer = project
3032 .update(cx, |project, cx| {
3033 project.open_buffer(buffer_path.clone(), cx)
3034 })
3035 .await
3036 .unwrap();
3037
3038 context_store.update(cx, |context_store, cx| {
3039 context_store.add_file_from_buffer(&buffer_path, buffer.clone(), false, cx);
3040 });
3041
3042 Ok(buffer)
3043 }
3044}