Wire through title update

Conrad Irwin created

Change summary

crates/acp_thread/src/acp_thread.rs |   1 
crates/agent2/src/agent.rs          |  19 +++-
crates/agent2/src/history_store.rs  |   1 
crates/agent2/src/thread.rs         | 112 ++++++++++++++++++------------
4 files changed, 82 insertions(+), 51 deletions(-)

Detailed changes

crates/acp_thread/src/acp_thread.rs 🔗

@@ -937,6 +937,7 @@ impl AcpThread {
     }
 
     pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
+        dbg!("update title", &title);
         self.title = title;
         cx.emit(AcpThreadEvent::TitleUpdated);
         Ok(())

crates/agent2/src/agent.rs 🔗

@@ -244,20 +244,28 @@ impl NativeAgent {
         cx: &mut Context<Self>,
     ) {
         let id = thread.read(cx).id().clone();
+        let weak_thread = acp_thread.downgrade();
         self.sessions.insert(
             id,
             Session {
                 thread: thread.clone(),
-                acp_thread: acp_thread.downgrade(),
+                acp_thread: weak_thread.clone(),
                 save_task: Task::ready(Ok(())),
                 _subscriptions: vec![
                     cx.observe_release(&acp_thread, |this, acp_thread, _cx| {
                         this.sessions.remove(acp_thread.session_id());
                     }),
-                    cx.observe(&thread, |this, thread, cx| {
-                        thread.update(cx, |thread, cx| {
-                            thread.generate_title_if_needed(cx);
-                        });
+                    cx.observe(&thread, move |this, thread, cx| {
+                        if let Some(response_stream) =
+                            thread.update(cx, |thread, cx| thread.generate_title_if_needed(cx))
+                        {
+                            NativeAgentConnection::handle_thread_events(
+                                response_stream,
+                                weak_thread.clone(),
+                                cx,
+                            )
+                            .detach_and_log_err(cx);
+                        }
                         this.save_thread(thread.clone(), cx)
                     }),
                 ],
@@ -659,6 +667,7 @@ impl NativeAgentConnection {
                                 })??;
                             }
                             ThreadEvent::TitleUpdate(title) => {
+                                dbg!("updating title");
                                 acp_thread
                                     .update(cx, |thread, cx| thread.update_title(title, cx))??;
                             }

crates/agent2/src/history_store.rs 🔗

@@ -87,6 +87,7 @@ impl HistoryStore {
         let history = AgentHistory {
             entries: history.clone(),
             _task: cx.spawn(async move |this, cx| {
+                dbg!("loaded", history.borrow().as_ref().map(|b| b.len()));
                 while history.changed().await.is_ok() {
                     this.update(cx, |_, cx| cx.notify()).ok();
                 }

crates/agent2/src/thread.rs 🔗

@@ -458,7 +458,7 @@ pub struct ToolCallAuthorization {
 
 enum ThreadTitle {
     None,
-    Pending(Task<()>),
+    Pending(Shared<Task<()>>),
     Done(Result<SharedString>),
 }
 
@@ -1082,24 +1082,33 @@ impl Thread {
         Ok(events_rx)
     }
 
-    pub fn generate_title_if_needed(&mut self, cx: &mut Context<Self>) {
+    pub fn generate_title_if_needed(
+        &mut self,
+        cx: &mut Context<Self>,
+    ) -> Option<mpsc::UnboundedReceiver<Result<ThreadEvent>>> {
         if !matches!(self.title, ThreadTitle::None) {
-            return;
+            return None;
         }
 
         // todo!() copy logic from agent1 re: tool calls, etc.?
         if self.messages.len() < 2 {
-            return;
+            return None;
         }
+        let Some(model) = self.summarization_model.clone() else {
+            return None;
+        };
+        let (tx, rx) = mpsc::unbounded();
 
-        self.generate_title(cx);
+        self.generate_title(model, ThreadEventStream(tx), cx);
+        Some(rx)
     }
 
-    fn generate_title(&mut self, cx: &mut Context<Self>) {
-        let Some(model) = self.summarization_model.clone() else {
-            println!("No thread summary model");
-            return;
-        };
+    fn generate_title(
+        &mut self,
+        model: Arc<dyn LanguageModel>,
+        event_stream: ThreadEventStream,
+        cx: &mut Context<Self>,
+    ) {
         let mut request = LanguageModelRequest {
             intent: Some(CompletionIntent::ThreadSummarization),
             temperature: AgentSettings::temperature_for_model(&model, cx),
@@ -1116,50 +1125,55 @@ impl Thread {
             cache: false,
         });
 
-        let task = cx.spawn(async move |this, cx| {
-            let result = async {
-                let mut messages = model.stream_completion(request, &cx).await?;
-
-                let mut new_summary = String::new();
-                while let Some(event) = messages.next().await {
-                    let Ok(event) = event else {
-                        continue;
-                    };
-                    let text = match event {
-                        LanguageModelCompletionEvent::Text(text) => text,
-                        LanguageModelCompletionEvent::StatusUpdate(
-                            CompletionRequestStatus::UsageUpdated { .. },
-                        ) => {
-                            // this.update(cx, |thread, cx| {
-                            //     thread.update_model_request_usage(amount as u32, limit, cx);
-                            // })?;
-                            // todo!()? not sure if this is the right place to do this.
+        let task = cx
+            .spawn(async move |this, cx| {
+                let result: anyhow::Result<SharedString> = async {
+                    let mut messages = model.stream_completion(request, &cx).await?;
+
+                    let mut new_summary = String::new();
+                    while let Some(event) = messages.next().await {
+                        let Ok(event) = event else {
                             continue;
-                        }
-                        _ => continue,
-                    };
+                        };
+                        let text = match event {
+                            LanguageModelCompletionEvent::Text(text) => text,
+                            LanguageModelCompletionEvent::StatusUpdate(
+                                CompletionRequestStatus::UsageUpdated { .. },
+                            ) => {
+                                // this.update(cx, |thread, cx| {
+                                //     thread.update_model_request_usage(amount as u32, limit, cx);
+                                // })?;
+                                // todo!()? not sure if this is the right place to do this.
+                                continue;
+                            }
+                            _ => continue,
+                        };
 
-                    let mut lines = text.lines();
-                    new_summary.extend(lines.next());
+                        let mut lines = text.lines();
+                        new_summary.extend(lines.next());
 
-                    // Stop if the LLM generated multiple lines.
-                    if lines.next().is_some() {
-                        break;
+                        // Stop if the LLM generated multiple lines.
+                        if lines.next().is_some() {
+                            break;
+                        }
                     }
-                }
 
-                anyhow::Ok(new_summary.into())
-            }
-            .await;
+                    anyhow::Ok(new_summary.into())
+                }
+                .await;
 
-            this.update(cx, |this, cx| {
-                this.title = ThreadTitle::Done(result);
-                cx.notify();
+                this.update(cx, |this, cx| {
+                    if let Ok(title) = &result {
+                        event_stream.send_title_update(title.clone());
+                    }
+                    this.title = ThreadTitle::Done(result);
+                    cx.notify();
+                })
+                .log_err();
             })
-            .log_err();
-        });
+            .shared();
 
-        self.title = ThreadTitle::Pending(task);
+        self.title = ThreadTitle::Pending(task.clone());
         cx.notify()
     }
 
@@ -1746,6 +1760,12 @@ impl ThreadEventStream {
             .ok();
     }
 
+    fn send_title_update(&self, text: SharedString) {
+        self.0
+            .unbounded_send(Ok(ThreadEvent::TitleUpdate(text)))
+            .ok();
+    }
+
     fn send_thinking(&self, text: &str) {
         self.0
             .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))