assistant2: Prevent concurrent thread saving tasks (#26089)

Marshall Bowers created

This PR makes it so only one thread-saving task will be in flight at a
time.

Release Notes:

- N/A

Change summary

crates/assistant2/src/active_thread.rs | 52 ++++++++++++++-------------
1 file changed, 27 insertions(+), 25 deletions(-)

Detailed changes

crates/assistant2/src/active_thread.rs 🔗

@@ -6,7 +6,7 @@ use editor::{Editor, MultiBuffer};
 use gpui::{
     list, AbsoluteLength, AnyElement, App, ClickEvent, DefiniteLength, EdgesRefinement, Empty,
     Entity, Focusable, Length, ListAlignment, ListOffset, ListState, StyleRefinement, Subscription,
-    TextStyleRefinement, UnderlineStyle, WeakEntity,
+    Task, TextStyleRefinement, UnderlineStyle, WeakEntity,
 };
 use language::{Buffer, LanguageRegistry};
 use language_model::{LanguageModelRegistry, LanguageModelToolUseId, Role};
@@ -14,6 +14,7 @@ use markdown::{Markdown, MarkdownStyle};
 use settings::Settings as _;
 use theme::ThemeSettings;
 use ui::{prelude::*, Disclosure, KeyBinding};
+use util::ResultExt as _;
 use workspace::Workspace;
 
 use crate::thread::{MessageId, RequestKind, Thread, ThreadError, ThreadEvent};
@@ -27,6 +28,7 @@ pub struct ActiveThread {
     tools: Arc<ToolWorkingSet>,
     thread_store: Entity<ThreadStore>,
     thread: Entity<Thread>,
+    save_thread_task: Option<Task<()>>,
     messages: Vec<MessageId>,
     list_state: ListState,
     rendered_messages_by_id: HashMap<MessageId, Entity<Markdown>>,
@@ -61,6 +63,7 @@ impl ActiveThread {
             tools,
             thread_store,
             thread: thread.clone(),
+            save_thread_task: None,
             messages: Vec::new(),
             rendered_messages_by_id: HashMap::default(),
             expanded_tool_uses: HashMap::default(),
@@ -250,11 +253,7 @@ impl ActiveThread {
                 self.last_error = Some(error.clone());
             }
             ThreadEvent::StreamedCompletion | ThreadEvent::SummaryChanged => {
-                self.thread_store
-                    .update(cx, |thread_store, cx| {
-                        thread_store.save_thread(&self.thread, cx)
-                    })
-                    .detach_and_log_err(cx);
+                self.save_thread(cx);
             }
             ThreadEvent::StreamedAssistantText(message_id, text) => {
                 if let Some(markdown) = self.rendered_messages_by_id.get_mut(&message_id) {
@@ -273,12 +272,7 @@ impl ActiveThread {
                     self.push_message(message_id, message_text, window, cx);
                 }
 
-                self.thread_store
-                    .update(cx, |thread_store, cx| {
-                        thread_store.save_thread(&self.thread, cx)
-                    })
-                    .detach_and_log_err(cx);
-
+                self.save_thread(cx);
                 cx.notify();
             }
             ThreadEvent::MessageEdited(message_id) => {
@@ -291,23 +285,12 @@ impl ActiveThread {
                     self.edited_message(message_id, message_text, window, cx);
                 }
 
-                self.thread_store
-                    .update(cx, |thread_store, cx| {
-                        thread_store.save_thread(&self.thread, cx)
-                    })
-                    .detach_and_log_err(cx);
-
+                self.save_thread(cx);
                 cx.notify();
             }
             ThreadEvent::MessageDeleted(message_id) => {
                 self.deleted_message(message_id);
-
-                self.thread_store
-                    .update(cx, |thread_store, cx| {
-                        thread_store.save_thread(&self.thread, cx)
-                    })
-                    .detach_and_log_err(cx);
-
+                self.save_thread(cx);
                 cx.notify();
             }
             ThreadEvent::UsePendingTools => {
@@ -358,6 +341,25 @@ impl ActiveThread {
         }
     }
 
+    /// Spawns a task to save the active thread.
+    ///
+    /// Only one task to save the thread will be in flight at a time.
+    fn save_thread(&mut self, cx: &mut Context<Self>) {
+        let thread = self.thread.clone();
+        self.save_thread_task = Some(cx.spawn(|this, mut cx| async move {
+            let task = this
+                .update(&mut cx, |this, cx| {
+                    this.thread_store
+                        .update(cx, |thread_store, cx| thread_store.save_thread(&thread, cx))
+                })
+                .ok();
+
+            if let Some(task) = task {
+                task.await.log_err();
+            }
+        }));
+    }
+
     fn start_editing_message(
         &mut self,
         message_id: MessageId,