1use std::fmt::Write as _;
2use std::io::Write;
3use std::ops::Range;
4use std::sync::Arc;
5
6use anyhow::{Context as _, Result};
7use assistant_tool::{ActionLog, ToolWorkingSet};
8use chrono::{DateTime, Utc};
9use collections::{BTreeMap, HashMap, HashSet};
10use futures::future::Shared;
11use futures::{FutureExt, StreamExt as _};
12use git;
13use gpui::{App, AppContext, Context, Entity, EventEmitter, SharedString, Task};
14use language_model::{
15 LanguageModel, LanguageModelCompletionEvent, LanguageModelRegistry, LanguageModelRequest,
16 LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
17 LanguageModelToolUseId, MaxMonthlySpendReachedError, MessageContent, PaymentRequiredError,
18 Role, StopReason, TokenUsage,
19};
20use project::Project;
21use prompt_store::{AssistantSystemPromptWorktree, PromptBuilder};
22use scripting_tool::{ScriptingSession, ScriptingTool};
23use serde::{Deserialize, Serialize};
24use util::{post_inc, ResultExt, TryFutureExt as _};
25use uuid::Uuid;
26
27use crate::context::{attach_context_to_message, ContextId, ContextSnapshot};
28use crate::thread_store::{
29 SerializedMessage, SerializedThread, SerializedToolResult, SerializedToolUse,
30};
31use crate::tool_use::{PendingToolUse, ToolUse, ToolUseState};
32
33#[derive(Debug, Clone, Copy)]
34pub enum RequestKind {
35 Chat,
36 /// Used when summarizing a thread.
37 Summarize,
38}
39
40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
41pub struct ThreadId(Arc<str>);
42
43impl ThreadId {
44 pub fn new() -> Self {
45 Self(Uuid::new_v4().to_string().into())
46 }
47}
48
49impl std::fmt::Display for ThreadId {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "{}", self.0)
52 }
53}
54
55#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, Serialize, Deserialize)]
56pub struct MessageId(pub(crate) usize);
57
58impl MessageId {
59 fn post_inc(&mut self) -> Self {
60 Self(post_inc(&mut self.0))
61 }
62}
63
64/// A message in a [`Thread`].
65#[derive(Debug, Clone)]
66pub struct Message {
67 pub id: MessageId,
68 pub role: Role,
69 pub text: String,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct ProjectSnapshot {
74 pub worktree_snapshots: Vec<WorktreeSnapshot>,
75 pub unsaved_buffer_paths: Vec<String>,
76 pub timestamp: DateTime<Utc>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct WorktreeSnapshot {
81 pub worktree_path: String,
82 pub git_state: Option<GitState>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct GitState {
87 pub remote_url: Option<String>,
88 pub head_sha: Option<String>,
89 pub current_branch: Option<String>,
90 pub diff: Option<String>,
91}
92
93/// A thread of conversation with the LLM.
94pub struct Thread {
95 id: ThreadId,
96 updated_at: DateTime<Utc>,
97 summary: Option<SharedString>,
98 pending_summary: Task<Option<()>>,
99 messages: Vec<Message>,
100 next_message_id: MessageId,
101 context: BTreeMap<ContextId, ContextSnapshot>,
102 context_by_message: HashMap<MessageId, Vec<ContextId>>,
103 completion_count: usize,
104 pending_completions: Vec<PendingCompletion>,
105 project: Entity<Project>,
106 prompt_builder: Arc<PromptBuilder>,
107 tools: Arc<ToolWorkingSet>,
108 tool_use: ToolUseState,
109 action_log: Entity<ActionLog>,
110 scripting_session: Entity<ScriptingSession>,
111 scripting_tool_use: ToolUseState,
112 initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
113 cumulative_token_usage: TokenUsage,
114}
115
116impl Thread {
117 pub fn new(
118 project: Entity<Project>,
119 tools: Arc<ToolWorkingSet>,
120 prompt_builder: Arc<PromptBuilder>,
121 cx: &mut Context<Self>,
122 ) -> Self {
123 Self {
124 id: ThreadId::new(),
125 updated_at: Utc::now(),
126 summary: None,
127 pending_summary: Task::ready(None),
128 messages: Vec::new(),
129 next_message_id: MessageId(0),
130 context: BTreeMap::default(),
131 context_by_message: HashMap::default(),
132 completion_count: 0,
133 pending_completions: Vec::new(),
134 project: project.clone(),
135 prompt_builder,
136 tools,
137 tool_use: ToolUseState::new(),
138 scripting_session: cx.new(|cx| ScriptingSession::new(project.clone(), cx)),
139 scripting_tool_use: ToolUseState::new(),
140 action_log: cx.new(|_| ActionLog::new()),
141 initial_project_snapshot: {
142 let project_snapshot = Self::project_snapshot(project, cx);
143 cx.foreground_executor()
144 .spawn(async move { Some(project_snapshot.await) })
145 .shared()
146 },
147 cumulative_token_usage: TokenUsage::default(),
148 }
149 }
150
151 pub fn deserialize(
152 id: ThreadId,
153 serialized: SerializedThread,
154 project: Entity<Project>,
155 tools: Arc<ToolWorkingSet>,
156 prompt_builder: Arc<PromptBuilder>,
157 cx: &mut Context<Self>,
158 ) -> Self {
159 let next_message_id = MessageId(
160 serialized
161 .messages
162 .last()
163 .map(|message| message.id.0 + 1)
164 .unwrap_or(0),
165 );
166 let tool_use = ToolUseState::from_serialized_messages(&serialized.messages, |name| {
167 name != ScriptingTool::NAME
168 });
169 let scripting_tool_use =
170 ToolUseState::from_serialized_messages(&serialized.messages, |name| {
171 name == ScriptingTool::NAME
172 });
173 let scripting_session = cx.new(|cx| ScriptingSession::new(project.clone(), cx));
174
175 Self {
176 id,
177 updated_at: serialized.updated_at,
178 summary: Some(serialized.summary),
179 pending_summary: Task::ready(None),
180 messages: serialized
181 .messages
182 .into_iter()
183 .map(|message| Message {
184 id: message.id,
185 role: message.role,
186 text: message.text,
187 })
188 .collect(),
189 next_message_id,
190 context: BTreeMap::default(),
191 context_by_message: HashMap::default(),
192 completion_count: 0,
193 pending_completions: Vec::new(),
194 project,
195 prompt_builder,
196 tools,
197 tool_use,
198 action_log: cx.new(|_| ActionLog::new()),
199 scripting_session,
200 scripting_tool_use,
201 initial_project_snapshot: Task::ready(serialized.initial_project_snapshot).shared(),
202 // TODO: persist token usage?
203 cumulative_token_usage: TokenUsage::default(),
204 }
205 }
206
207 pub fn id(&self) -> &ThreadId {
208 &self.id
209 }
210
211 pub fn is_empty(&self) -> bool {
212 self.messages.is_empty()
213 }
214
215 pub fn updated_at(&self) -> DateTime<Utc> {
216 self.updated_at
217 }
218
219 pub fn touch_updated_at(&mut self) {
220 self.updated_at = Utc::now();
221 }
222
223 pub fn summary(&self) -> Option<SharedString> {
224 self.summary.clone()
225 }
226
227 pub fn summary_or_default(&self) -> SharedString {
228 const DEFAULT: SharedString = SharedString::new_static("New Thread");
229 self.summary.clone().unwrap_or(DEFAULT)
230 }
231
232 pub fn set_summary(&mut self, summary: impl Into<SharedString>, cx: &mut Context<Self>) {
233 self.summary = Some(summary.into());
234 cx.emit(ThreadEvent::SummaryChanged);
235 }
236
237 pub fn message(&self, id: MessageId) -> Option<&Message> {
238 self.messages.iter().find(|message| message.id == id)
239 }
240
241 pub fn messages(&self) -> impl Iterator<Item = &Message> {
242 self.messages.iter()
243 }
244
245 pub fn is_generating(&self) -> bool {
246 !self.pending_completions.is_empty() || !self.all_tools_finished()
247 }
248
249 pub fn tools(&self) -> &Arc<ToolWorkingSet> {
250 &self.tools
251 }
252
253 pub fn context_for_message(&self, id: MessageId) -> Option<Vec<ContextSnapshot>> {
254 let context = self.context_by_message.get(&id)?;
255 Some(
256 context
257 .into_iter()
258 .filter_map(|context_id| self.context.get(&context_id))
259 .cloned()
260 .collect::<Vec<_>>(),
261 )
262 }
263
264 /// Returns whether all of the tool uses have finished running.
265 pub fn all_tools_finished(&self) -> bool {
266 let mut all_pending_tool_uses = self
267 .tool_use
268 .pending_tool_uses()
269 .into_iter()
270 .chain(self.scripting_tool_use.pending_tool_uses());
271
272 // If the only pending tool uses left are the ones with errors, then
273 // that means that we've finished running all of the pending tools.
274 all_pending_tool_uses.all(|tool_use| tool_use.status.is_error())
275 }
276
277 pub fn tool_uses_for_message(&self, id: MessageId) -> Vec<ToolUse> {
278 self.tool_use.tool_uses_for_message(id)
279 }
280
281 pub fn scripting_tool_uses_for_message(&self, id: MessageId) -> Vec<ToolUse> {
282 self.scripting_tool_use.tool_uses_for_message(id)
283 }
284
285 pub fn tool_results_for_message(&self, id: MessageId) -> Vec<&LanguageModelToolResult> {
286 self.tool_use.tool_results_for_message(id)
287 }
288
289 pub fn tool_result(&self, id: &LanguageModelToolUseId) -> Option<&LanguageModelToolResult> {
290 self.tool_use.tool_result(id)
291 }
292
293 pub fn scripting_tool_results_for_message(
294 &self,
295 id: MessageId,
296 ) -> Vec<&LanguageModelToolResult> {
297 self.scripting_tool_use.tool_results_for_message(id)
298 }
299
300 pub fn scripting_changed_buffers<'a>(
301 &self,
302 cx: &'a App,
303 ) -> impl ExactSizeIterator<Item = &'a Entity<language::Buffer>> {
304 self.scripting_session.read(cx).changed_buffers()
305 }
306
307 pub fn message_has_tool_results(&self, message_id: MessageId) -> bool {
308 self.tool_use.message_has_tool_results(message_id)
309 }
310
311 pub fn message_has_scripting_tool_results(&self, message_id: MessageId) -> bool {
312 self.scripting_tool_use.message_has_tool_results(message_id)
313 }
314
315 pub fn insert_user_message(
316 &mut self,
317 text: impl Into<String>,
318 context: Vec<ContextSnapshot>,
319 cx: &mut Context<Self>,
320 ) -> MessageId {
321 let message_id = self.insert_message(Role::User, text, cx);
322 let context_ids = context.iter().map(|context| context.id).collect::<Vec<_>>();
323 self.context
324 .extend(context.into_iter().map(|context| (context.id, context)));
325 self.context_by_message.insert(message_id, context_ids);
326 message_id
327 }
328
329 pub fn insert_message(
330 &mut self,
331 role: Role,
332 text: impl Into<String>,
333 cx: &mut Context<Self>,
334 ) -> MessageId {
335 let id = self.next_message_id.post_inc();
336 self.messages.push(Message {
337 id,
338 role,
339 text: text.into(),
340 });
341 self.touch_updated_at();
342 cx.emit(ThreadEvent::MessageAdded(id));
343 id
344 }
345
346 pub fn edit_message(
347 &mut self,
348 id: MessageId,
349 new_role: Role,
350 new_text: String,
351 cx: &mut Context<Self>,
352 ) -> bool {
353 let Some(message) = self.messages.iter_mut().find(|message| message.id == id) else {
354 return false;
355 };
356 message.role = new_role;
357 message.text = new_text;
358 self.touch_updated_at();
359 cx.emit(ThreadEvent::MessageEdited(id));
360 true
361 }
362
363 pub fn delete_message(&mut self, id: MessageId, cx: &mut Context<Self>) -> bool {
364 let Some(index) = self.messages.iter().position(|message| message.id == id) else {
365 return false;
366 };
367 self.messages.remove(index);
368 self.context_by_message.remove(&id);
369 self.touch_updated_at();
370 cx.emit(ThreadEvent::MessageDeleted(id));
371 true
372 }
373
374 /// Returns the representation of this [`Thread`] in a textual form.
375 ///
376 /// This is the representation we use when attaching a thread as context to another thread.
377 pub fn text(&self) -> String {
378 let mut text = String::new();
379
380 for message in &self.messages {
381 text.push_str(match message.role {
382 language_model::Role::User => "User:",
383 language_model::Role::Assistant => "Assistant:",
384 language_model::Role::System => "System:",
385 });
386 text.push('\n');
387
388 text.push_str(&message.text);
389 text.push('\n');
390 }
391
392 text
393 }
394
395 /// Serializes this thread into a format for storage or telemetry.
396 pub fn serialize(&self, cx: &mut Context<Self>) -> Task<Result<SerializedThread>> {
397 let initial_project_snapshot = self.initial_project_snapshot.clone();
398 cx.spawn(async move |this, cx| {
399 let initial_project_snapshot = initial_project_snapshot.await;
400 this.read_with(cx, |this, _| SerializedThread {
401 summary: this.summary_or_default(),
402 updated_at: this.updated_at(),
403 messages: this
404 .messages()
405 .map(|message| SerializedMessage {
406 id: message.id,
407 role: message.role,
408 text: message.text.clone(),
409 tool_uses: this
410 .tool_uses_for_message(message.id)
411 .into_iter()
412 .chain(this.scripting_tool_uses_for_message(message.id))
413 .map(|tool_use| SerializedToolUse {
414 id: tool_use.id,
415 name: tool_use.name,
416 input: tool_use.input,
417 })
418 .collect(),
419 tool_results: this
420 .tool_results_for_message(message.id)
421 .into_iter()
422 .chain(this.scripting_tool_results_for_message(message.id))
423 .map(|tool_result| SerializedToolResult {
424 tool_use_id: tool_result.tool_use_id.clone(),
425 is_error: tool_result.is_error,
426 content: tool_result.content.clone(),
427 })
428 .collect(),
429 })
430 .collect(),
431 initial_project_snapshot,
432 })
433 })
434 }
435
436 pub fn send_to_model(
437 &mut self,
438 model: Arc<dyn LanguageModel>,
439 request_kind: RequestKind,
440 cx: &mut Context<Self>,
441 ) {
442 let mut request = self.to_completion_request(request_kind, cx);
443 request.tools = {
444 let mut tools = Vec::new();
445
446 if self.tools.is_scripting_tool_enabled() {
447 tools.push(LanguageModelRequestTool {
448 name: ScriptingTool::NAME.into(),
449 description: ScriptingTool::DESCRIPTION.into(),
450 input_schema: ScriptingTool::input_schema(),
451 });
452 }
453
454 tools.extend(self.tools().enabled_tools(cx).into_iter().map(|tool| {
455 LanguageModelRequestTool {
456 name: tool.name(),
457 description: tool.description(),
458 input_schema: tool.input_schema(),
459 }
460 }));
461
462 tools
463 };
464
465 self.stream_completion(request, model, cx);
466 }
467
468 pub fn to_completion_request(
469 &self,
470 request_kind: RequestKind,
471 cx: &App,
472 ) -> LanguageModelRequest {
473 let worktree_root_names = self
474 .project
475 .read(cx)
476 .visible_worktrees(cx)
477 .map(|worktree| {
478 let worktree = worktree.read(cx);
479 AssistantSystemPromptWorktree {
480 root_name: worktree.root_name().into(),
481 abs_path: worktree.abs_path(),
482 }
483 })
484 .collect::<Vec<_>>();
485 let system_prompt = self
486 .prompt_builder
487 .generate_assistant_system_prompt(worktree_root_names)
488 .context("failed to generate assistant system prompt")
489 .log_err()
490 .unwrap_or_default();
491
492 let mut request = LanguageModelRequest {
493 messages: vec![LanguageModelRequestMessage {
494 role: Role::System,
495 content: vec![MessageContent::Text(system_prompt)],
496 cache: true,
497 }],
498 tools: Vec::new(),
499 stop: Vec::new(),
500 temperature: None,
501 };
502
503 let mut referenced_context_ids = HashSet::default();
504
505 for message in &self.messages {
506 if let Some(context_ids) = self.context_by_message.get(&message.id) {
507 referenced_context_ids.extend(context_ids);
508 }
509
510 let mut request_message = LanguageModelRequestMessage {
511 role: message.role,
512 content: Vec::new(),
513 cache: false,
514 };
515
516 match request_kind {
517 RequestKind::Chat => {
518 self.tool_use
519 .attach_tool_results(message.id, &mut request_message);
520 self.scripting_tool_use
521 .attach_tool_results(message.id, &mut request_message);
522 }
523 RequestKind::Summarize => {
524 // We don't care about tool use during summarization.
525 }
526 }
527
528 if !message.text.is_empty() {
529 request_message
530 .content
531 .push(MessageContent::Text(message.text.clone()));
532 }
533
534 match request_kind {
535 RequestKind::Chat => {
536 self.tool_use
537 .attach_tool_uses(message.id, &mut request_message);
538 self.scripting_tool_use
539 .attach_tool_uses(message.id, &mut request_message);
540 }
541 RequestKind::Summarize => {
542 // We don't care about tool use during summarization.
543 }
544 };
545
546 request.messages.push(request_message);
547 }
548
549 if !referenced_context_ids.is_empty() {
550 let mut context_message = LanguageModelRequestMessage {
551 role: Role::User,
552 content: Vec::new(),
553 cache: false,
554 };
555
556 let referenced_context = referenced_context_ids
557 .into_iter()
558 .filter_map(|context_id| self.context.get(context_id))
559 .cloned();
560 attach_context_to_message(&mut context_message, referenced_context);
561
562 request.messages.push(context_message);
563 }
564
565 self.attach_stale_files(&mut request.messages, cx);
566
567 request
568 }
569
570 fn attach_stale_files(&self, messages: &mut Vec<LanguageModelRequestMessage>, cx: &App) {
571 const STALE_FILES_HEADER: &str = "These files changed since last read:";
572
573 let mut stale_message = String::new();
574
575 for stale_file in self.action_log.read(cx).stale_buffers(cx) {
576 let Some(file) = stale_file.read(cx).file() else {
577 continue;
578 };
579
580 if stale_message.is_empty() {
581 write!(&mut stale_message, "{}", STALE_FILES_HEADER).ok();
582 }
583
584 writeln!(&mut stale_message, "- {}", file.path().display()).ok();
585 }
586
587 if !stale_message.is_empty() {
588 let context_message = LanguageModelRequestMessage {
589 role: Role::User,
590 content: vec![stale_message.into()],
591 cache: false,
592 };
593
594 messages.push(context_message);
595 }
596 }
597
598 pub fn stream_completion(
599 &mut self,
600 request: LanguageModelRequest,
601 model: Arc<dyn LanguageModel>,
602 cx: &mut Context<Self>,
603 ) {
604 let pending_completion_id = post_inc(&mut self.completion_count);
605
606 let task = cx.spawn(async move |thread, cx| {
607 let stream = model.stream_completion(request, &cx);
608 let stream_completion = async {
609 let mut events = stream.await?;
610 let mut stop_reason = StopReason::EndTurn;
611 let mut current_token_usage = TokenUsage::default();
612
613 while let Some(event) = events.next().await {
614 let event = event?;
615
616 thread.update(cx, |thread, cx| {
617 match event {
618 LanguageModelCompletionEvent::StartMessage { .. } => {
619 thread.insert_message(Role::Assistant, String::new(), cx);
620 }
621 LanguageModelCompletionEvent::Stop(reason) => {
622 stop_reason = reason;
623 }
624 LanguageModelCompletionEvent::UsageUpdate(token_usage) => {
625 thread.cumulative_token_usage =
626 thread.cumulative_token_usage.clone() + token_usage.clone()
627 - current_token_usage.clone();
628 current_token_usage = token_usage;
629 }
630 LanguageModelCompletionEvent::Text(chunk) => {
631 if let Some(last_message) = thread.messages.last_mut() {
632 if last_message.role == Role::Assistant {
633 last_message.text.push_str(&chunk);
634 cx.emit(ThreadEvent::StreamedAssistantText(
635 last_message.id,
636 chunk,
637 ));
638 } else {
639 // If we won't have an Assistant message yet, assume this chunk marks the beginning
640 // of a new Assistant response.
641 //
642 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
643 // will result in duplicating the text of the chunk in the rendered Markdown.
644 thread.insert_message(Role::Assistant, chunk, cx);
645 };
646 }
647 }
648 LanguageModelCompletionEvent::ToolUse(tool_use) => {
649 if let Some(last_assistant_message) = thread
650 .messages
651 .iter()
652 .rfind(|message| message.role == Role::Assistant)
653 {
654 if tool_use.name.as_ref() == ScriptingTool::NAME {
655 thread
656 .scripting_tool_use
657 .request_tool_use(last_assistant_message.id, tool_use);
658 } else {
659 thread
660 .tool_use
661 .request_tool_use(last_assistant_message.id, tool_use);
662 }
663 }
664 }
665 }
666
667 thread.touch_updated_at();
668 cx.emit(ThreadEvent::StreamedCompletion);
669 cx.notify();
670 })?;
671
672 smol::future::yield_now().await;
673 }
674
675 thread.update(cx, |thread, cx| {
676 thread
677 .pending_completions
678 .retain(|completion| completion.id != pending_completion_id);
679
680 if thread.summary.is_none() && thread.messages.len() >= 2 {
681 thread.summarize(cx);
682 }
683 })?;
684
685 anyhow::Ok(stop_reason)
686 };
687
688 let result = stream_completion.await;
689
690 thread
691 .update(cx, |thread, cx| {
692 match result.as_ref() {
693 Ok(stop_reason) => match stop_reason {
694 StopReason::ToolUse => {
695 cx.emit(ThreadEvent::UsePendingTools);
696 }
697 StopReason::EndTurn => {}
698 StopReason::MaxTokens => {}
699 },
700 Err(error) => {
701 if error.is::<PaymentRequiredError>() {
702 cx.emit(ThreadEvent::ShowError(ThreadError::PaymentRequired));
703 } else if error.is::<MaxMonthlySpendReachedError>() {
704 cx.emit(ThreadEvent::ShowError(
705 ThreadError::MaxMonthlySpendReached,
706 ));
707 } else {
708 let error_message = error
709 .chain()
710 .map(|err| err.to_string())
711 .collect::<Vec<_>>()
712 .join("\n");
713 cx.emit(ThreadEvent::ShowError(ThreadError::Message(
714 SharedString::from(error_message.clone()),
715 )));
716 }
717
718 thread.cancel_last_completion(cx);
719 }
720 }
721 cx.emit(ThreadEvent::DoneStreaming);
722 })
723 .ok();
724 });
725
726 self.pending_completions.push(PendingCompletion {
727 id: pending_completion_id,
728 _task: task,
729 });
730 }
731
732 pub fn summarize(&mut self, cx: &mut Context<Self>) {
733 let Some(provider) = LanguageModelRegistry::read_global(cx).active_provider() else {
734 return;
735 };
736 let Some(model) = LanguageModelRegistry::read_global(cx).active_model() else {
737 return;
738 };
739
740 if !provider.is_authenticated(cx) {
741 return;
742 }
743
744 let mut request = self.to_completion_request(RequestKind::Summarize, cx);
745 request.messages.push(LanguageModelRequestMessage {
746 role: Role::User,
747 content: vec![
748 "Generate a concise 3-7 word title for this conversation, omitting punctuation. Go straight to the title, without any preamble and prefix like `Here's a concise suggestion:...` or `Title:`"
749 .into(),
750 ],
751 cache: false,
752 });
753
754 self.pending_summary = cx.spawn(async move |this, cx| {
755 async move {
756 let stream = model.stream_completion_text(request, &cx);
757 let mut messages = stream.await?;
758
759 let mut new_summary = String::new();
760 while let Some(message) = messages.stream.next().await {
761 let text = message?;
762 let mut lines = text.lines();
763 new_summary.extend(lines.next());
764
765 // Stop if the LLM generated multiple lines.
766 if lines.next().is_some() {
767 break;
768 }
769 }
770
771 this.update(cx, |this, cx| {
772 if !new_summary.is_empty() {
773 this.summary = Some(new_summary.into());
774 }
775
776 cx.emit(ThreadEvent::SummaryChanged);
777 })?;
778
779 anyhow::Ok(())
780 }
781 .log_err()
782 .await
783 });
784 }
785
786 pub fn use_pending_tools(&mut self, cx: &mut Context<Self>) {
787 let request = self.to_completion_request(RequestKind::Chat, cx);
788 let pending_tool_uses = self
789 .tool_use
790 .pending_tool_uses()
791 .into_iter()
792 .filter(|tool_use| tool_use.status.is_idle())
793 .cloned()
794 .collect::<Vec<_>>();
795
796 for tool_use in pending_tool_uses {
797 if let Some(tool) = self.tools.tool(&tool_use.name, cx) {
798 let task = tool.run(
799 tool_use.input,
800 &request.messages,
801 self.project.clone(),
802 self.action_log.clone(),
803 cx,
804 );
805
806 self.insert_tool_output(tool_use.id.clone(), task, cx);
807 }
808 }
809
810 let pending_scripting_tool_uses = self
811 .scripting_tool_use
812 .pending_tool_uses()
813 .into_iter()
814 .filter(|tool_use| tool_use.status.is_idle())
815 .cloned()
816 .collect::<Vec<_>>();
817
818 for scripting_tool_use in pending_scripting_tool_uses {
819 let task = match ScriptingTool::deserialize_input(scripting_tool_use.input) {
820 Err(err) => Task::ready(Err(err.into())),
821 Ok(input) => {
822 let (script_id, script_task) =
823 self.scripting_session.update(cx, move |session, cx| {
824 session.run_script(input.lua_script, cx)
825 });
826
827 let session = self.scripting_session.clone();
828 cx.spawn(async move |_, cx| {
829 script_task.await;
830
831 let message = session.read_with(cx, |session, _cx| {
832 // Using a id to get the script output seems impractical.
833 // Why not just include it in the Task result?
834 // This is because we'll later report the script state as it runs,
835 session
836 .get(script_id)
837 .output_message_for_llm()
838 .expect("Script shouldn't still be running")
839 })?;
840
841 Ok(message)
842 })
843 }
844 };
845
846 self.insert_scripting_tool_output(scripting_tool_use.id.clone(), task, cx);
847 }
848 }
849
850 pub fn insert_tool_output(
851 &mut self,
852 tool_use_id: LanguageModelToolUseId,
853 output: Task<Result<String>>,
854 cx: &mut Context<Self>,
855 ) {
856 let insert_output_task = cx.spawn({
857 let tool_use_id = tool_use_id.clone();
858 async move |thread, cx| {
859 let output = output.await;
860 thread
861 .update(cx, |thread, cx| {
862 let pending_tool_use = thread
863 .tool_use
864 .insert_tool_output(tool_use_id.clone(), output);
865
866 cx.emit(ThreadEvent::ToolFinished {
867 tool_use_id,
868 pending_tool_use,
869 canceled: false,
870 });
871 })
872 .ok();
873 }
874 });
875
876 self.tool_use
877 .run_pending_tool(tool_use_id, insert_output_task);
878 }
879
880 pub fn insert_scripting_tool_output(
881 &mut self,
882 tool_use_id: LanguageModelToolUseId,
883 output: Task<Result<String>>,
884 cx: &mut Context<Self>,
885 ) {
886 let insert_output_task = cx.spawn({
887 let tool_use_id = tool_use_id.clone();
888 async move |thread, cx| {
889 let output = output.await;
890 thread
891 .update(cx, |thread, cx| {
892 let pending_tool_use = thread
893 .scripting_tool_use
894 .insert_tool_output(tool_use_id.clone(), output);
895
896 cx.emit(ThreadEvent::ToolFinished {
897 tool_use_id,
898 pending_tool_use,
899 canceled: false,
900 });
901 })
902 .ok();
903 }
904 });
905
906 self.scripting_tool_use
907 .run_pending_tool(tool_use_id, insert_output_task);
908 }
909
910 pub fn attach_tool_results(
911 &mut self,
912 updated_context: Vec<ContextSnapshot>,
913 cx: &mut Context<Self>,
914 ) {
915 self.context.extend(
916 updated_context
917 .into_iter()
918 .map(|context| (context.id, context)),
919 );
920
921 // Insert a user message to contain the tool results.
922 self.insert_user_message(
923 // TODO: Sending up a user message without any content results in the model sending back
924 // responses that also don't have any content. We currently don't handle this case well,
925 // so for now we provide some text to keep the model on track.
926 "Here are the tool results.",
927 Vec::new(),
928 cx,
929 );
930 }
931
932 /// Cancels the last pending completion, if there are any pending.
933 ///
934 /// Returns whether a completion was canceled.
935 pub fn cancel_last_completion(&mut self, cx: &mut Context<Self>) -> bool {
936 if self.pending_completions.pop().is_some() {
937 true
938 } else {
939 let mut canceled = false;
940 for pending_tool_use in self.tool_use.cancel_pending() {
941 canceled = true;
942 cx.emit(ThreadEvent::ToolFinished {
943 tool_use_id: pending_tool_use.id.clone(),
944 pending_tool_use: Some(pending_tool_use),
945 canceled: true,
946 });
947 }
948 canceled
949 }
950 }
951
952 /// Reports feedback about the thread and stores it in our telemetry backend.
953 pub fn report_feedback(&self, is_positive: bool, cx: &mut Context<Self>) -> Task<Result<()>> {
954 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
955 let serialized_thread = self.serialize(cx);
956 let thread_id = self.id().clone();
957 let client = self.project.read(cx).client();
958
959 cx.background_spawn(async move {
960 let final_project_snapshot = final_project_snapshot.await;
961 let serialized_thread = serialized_thread.await?;
962 let thread_data =
963 serde_json::to_value(serialized_thread).unwrap_or_else(|_| serde_json::Value::Null);
964
965 let rating = if is_positive { "positive" } else { "negative" };
966 telemetry::event!(
967 "Assistant Thread Rated",
968 rating,
969 thread_id,
970 thread_data,
971 final_project_snapshot
972 );
973 client.telemetry().flush_events();
974
975 Ok(())
976 })
977 }
978
979 pub fn project(&self) -> &Entity<Project> {
980 &self.project
981 }
982
983 /// Create a snapshot of the current project state including git information and unsaved buffers.
984 fn project_snapshot(
985 project: Entity<Project>,
986 cx: &mut Context<Self>,
987 ) -> Task<Arc<ProjectSnapshot>> {
988 let worktree_snapshots: Vec<_> = project
989 .read(cx)
990 .visible_worktrees(cx)
991 .map(|worktree| Self::worktree_snapshot(worktree, cx))
992 .collect();
993
994 cx.spawn(async move |_, cx| {
995 let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
996
997 let mut unsaved_buffers = Vec::new();
998 cx.update(|app_cx| {
999 let buffer_store = project.read(app_cx).buffer_store();
1000 for buffer_handle in buffer_store.read(app_cx).buffers() {
1001 let buffer = buffer_handle.read(app_cx);
1002 if buffer.is_dirty() {
1003 if let Some(file) = buffer.file() {
1004 let path = file.path().to_string_lossy().to_string();
1005 unsaved_buffers.push(path);
1006 }
1007 }
1008 }
1009 })
1010 .ok();
1011
1012 Arc::new(ProjectSnapshot {
1013 worktree_snapshots,
1014 unsaved_buffer_paths: unsaved_buffers,
1015 timestamp: Utc::now(),
1016 })
1017 })
1018 }
1019
1020 fn worktree_snapshot(worktree: Entity<project::Worktree>, cx: &App) -> Task<WorktreeSnapshot> {
1021 cx.spawn(async move |cx| {
1022 // Get worktree path and snapshot
1023 let worktree_info = cx.update(|app_cx| {
1024 let worktree = worktree.read(app_cx);
1025 let path = worktree.abs_path().to_string_lossy().to_string();
1026 let snapshot = worktree.snapshot();
1027 (path, snapshot)
1028 });
1029
1030 let Ok((worktree_path, snapshot)) = worktree_info else {
1031 return WorktreeSnapshot {
1032 worktree_path: String::new(),
1033 git_state: None,
1034 };
1035 };
1036
1037 // Extract git information
1038 let git_state = match snapshot.repositories().first() {
1039 None => None,
1040 Some(repo_entry) => {
1041 // Get branch information
1042 let current_branch = repo_entry.branch().map(|branch| branch.name.to_string());
1043
1044 // Get repository info
1045 let repo_result = worktree.read_with(cx, |worktree, _cx| {
1046 if let project::Worktree::Local(local_worktree) = &worktree {
1047 local_worktree.get_local_repo(repo_entry).map(|local_repo| {
1048 let repo = local_repo.repo();
1049 (repo.remote_url("origin"), repo.head_sha(), repo.clone())
1050 })
1051 } else {
1052 None
1053 }
1054 });
1055
1056 match repo_result {
1057 Ok(Some((remote_url, head_sha, repository))) => {
1058 // Get diff asynchronously
1059 let diff = repository
1060 .diff(git::repository::DiffType::HeadToWorktree, cx.clone())
1061 .await
1062 .ok();
1063
1064 Some(GitState {
1065 remote_url,
1066 head_sha,
1067 current_branch,
1068 diff,
1069 })
1070 }
1071 Err(_) | Ok(None) => None,
1072 }
1073 }
1074 };
1075
1076 WorktreeSnapshot {
1077 worktree_path,
1078 git_state,
1079 }
1080 })
1081 }
1082
1083 pub fn to_markdown(&self) -> Result<String> {
1084 let mut markdown = Vec::new();
1085
1086 if let Some(summary) = self.summary() {
1087 writeln!(markdown, "# {summary}\n")?;
1088 };
1089
1090 for message in self.messages() {
1091 writeln!(
1092 markdown,
1093 "## {role}\n",
1094 role = match message.role {
1095 Role::User => "User",
1096 Role::Assistant => "Assistant",
1097 Role::System => "System",
1098 }
1099 )?;
1100 writeln!(markdown, "{}\n", message.text)?;
1101
1102 for tool_use in self.tool_uses_for_message(message.id) {
1103 writeln!(
1104 markdown,
1105 "**Use Tool: {} ({})**",
1106 tool_use.name, tool_use.id
1107 )?;
1108 writeln!(markdown, "```json")?;
1109 writeln!(
1110 markdown,
1111 "{}",
1112 serde_json::to_string_pretty(&tool_use.input)?
1113 )?;
1114 writeln!(markdown, "```")?;
1115 }
1116
1117 for tool_result in self.tool_results_for_message(message.id) {
1118 write!(markdown, "**Tool Results: {}", tool_result.tool_use_id)?;
1119 if tool_result.is_error {
1120 write!(markdown, " (Error)")?;
1121 }
1122
1123 writeln!(markdown, "**\n")?;
1124 writeln!(markdown, "{}", tool_result.content)?;
1125 }
1126 }
1127
1128 Ok(String::from_utf8_lossy(&markdown).to_string())
1129 }
1130
1131 pub fn review_edits_in_range(
1132 &mut self,
1133 buffer: Entity<language::Buffer>,
1134 buffer_range: Range<language::Anchor>,
1135 accept: bool,
1136 cx: &mut Context<Self>,
1137 ) -> Task<Result<()>> {
1138 self.action_log.update(cx, |action_log, cx| {
1139 action_log.review_edits_in_range(buffer, buffer_range, accept, cx)
1140 })
1141 }
1142
1143 pub fn action_log(&self) -> &Entity<ActionLog> {
1144 &self.action_log
1145 }
1146
1147 pub fn cumulative_token_usage(&self) -> TokenUsage {
1148 self.cumulative_token_usage.clone()
1149 }
1150}
1151
1152#[derive(Debug, Clone)]
1153pub enum ThreadError {
1154 PaymentRequired,
1155 MaxMonthlySpendReached,
1156 Message(SharedString),
1157}
1158
1159#[derive(Debug, Clone)]
1160pub enum ThreadEvent {
1161 ShowError(ThreadError),
1162 StreamedCompletion,
1163 StreamedAssistantText(MessageId, String),
1164 DoneStreaming,
1165 MessageAdded(MessageId),
1166 MessageEdited(MessageId),
1167 MessageDeleted(MessageId),
1168 SummaryChanged,
1169 UsePendingTools,
1170 ToolFinished {
1171 #[allow(unused)]
1172 tool_use_id: LanguageModelToolUseId,
1173 /// The pending tool use that corresponds to this tool.
1174 pending_tool_use: Option<PendingToolUse>,
1175 /// Whether the tool was canceled by the user.
1176 canceled: bool,
1177 },
1178}
1179
1180impl EventEmitter<ThreadEvent> for Thread {}
1181
1182struct PendingCompletion {
1183 id: usize,
1184 _task: Task<()>,
1185}