From 8998cdee2647977dc7acbc6b4ccd8a98b3074069 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Mon, 18 Aug 2025 21:03:31 -0600 Subject: [PATCH] Wire through title update --- crates/acp_thread/src/acp_thread.rs | 1 + crates/agent2/src/agent.rs | 19 +++-- crates/agent2/src/history_store.rs | 1 + crates/agent2/src/thread.rs | 112 ++++++++++++++++------------ 4 files changed, 82 insertions(+), 51 deletions(-) diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index bb9c2e35eab699bf1ae3b5b0b9cb064988fa6523..58e171046f69bd82cbe2a4bdb71e8dff58b143f5 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -937,6 +937,7 @@ impl AcpThread { } pub fn update_title(&mut self, title: SharedString, cx: &mut Context) -> Result<()> { + dbg!("update title", &title); self.title = title; cx.emit(AcpThreadEvent::TitleUpdated); Ok(()) diff --git a/crates/agent2/src/agent.rs b/crates/agent2/src/agent.rs index be0bac047f61509cdf2d103f144d40f50d86c109..564511c632784bcdeb67ba551005a7c207708d9a 100644 --- a/crates/agent2/src/agent.rs +++ b/crates/agent2/src/agent.rs @@ -244,20 +244,28 @@ impl NativeAgent { cx: &mut Context, ) { let id = thread.read(cx).id().clone(); + let weak_thread = acp_thread.downgrade(); self.sessions.insert( id, Session { thread: thread.clone(), - acp_thread: acp_thread.downgrade(), + acp_thread: weak_thread.clone(), save_task: Task::ready(Ok(())), _subscriptions: vec![ cx.observe_release(&acp_thread, |this, acp_thread, _cx| { this.sessions.remove(acp_thread.session_id()); }), - cx.observe(&thread, |this, thread, cx| { - thread.update(cx, |thread, cx| { - thread.generate_title_if_needed(cx); - }); + cx.observe(&thread, move |this, thread, cx| { + if let Some(response_stream) = + thread.update(cx, |thread, cx| thread.generate_title_if_needed(cx)) + { + NativeAgentConnection::handle_thread_events( + response_stream, + weak_thread.clone(), + cx, + ) + .detach_and_log_err(cx); + } this.save_thread(thread.clone(), cx) }), ], @@ -659,6 +667,7 @@ impl NativeAgentConnection { })??; } ThreadEvent::TitleUpdate(title) => { + dbg!("updating title"); acp_thread .update(cx, |thread, cx| thread.update_title(title, cx))??; } diff --git a/crates/agent2/src/history_store.rs b/crates/agent2/src/history_store.rs index 0622dd4f5898ca213fd2516841bc4cf005788c2a..151cc4e8a9aa37e3e2b7707846fb6a7db92fabcd 100644 --- a/crates/agent2/src/history_store.rs +++ b/crates/agent2/src/history_store.rs @@ -87,6 +87,7 @@ impl HistoryStore { let history = AgentHistory { entries: history.clone(), _task: cx.spawn(async move |this, cx| { + dbg!("loaded", history.borrow().as_ref().map(|b| b.len())); while history.changed().await.is_ok() { this.update(cx, |_, cx| cx.notify()).ok(); } diff --git a/crates/agent2/src/thread.rs b/crates/agent2/src/thread.rs index 93a6fad23a67795801f74901085082177f6fc32b..639c50957e0fd685d0de400bc245217736c29ef4 100644 --- a/crates/agent2/src/thread.rs +++ b/crates/agent2/src/thread.rs @@ -458,7 +458,7 @@ pub struct ToolCallAuthorization { enum ThreadTitle { None, - Pending(Task<()>), + Pending(Shared>), Done(Result), } @@ -1082,24 +1082,33 @@ impl Thread { Ok(events_rx) } - pub fn generate_title_if_needed(&mut self, cx: &mut Context) { + pub fn generate_title_if_needed( + &mut self, + cx: &mut Context, + ) -> Option>> { if !matches!(self.title, ThreadTitle::None) { - return; + return None; } // todo!() copy logic from agent1 re: tool calls, etc.? if self.messages.len() < 2 { - return; + return None; } + let Some(model) = self.summarization_model.clone() else { + return None; + }; + let (tx, rx) = mpsc::unbounded(); - self.generate_title(cx); + self.generate_title(model, ThreadEventStream(tx), cx); + Some(rx) } - fn generate_title(&mut self, cx: &mut Context) { - let Some(model) = self.summarization_model.clone() else { - println!("No thread summary model"); - return; - }; + fn generate_title( + &mut self, + model: Arc, + event_stream: ThreadEventStream, + cx: &mut Context, + ) { let mut request = LanguageModelRequest { intent: Some(CompletionIntent::ThreadSummarization), temperature: AgentSettings::temperature_for_model(&model, cx), @@ -1116,50 +1125,55 @@ impl Thread { cache: false, }); - let task = cx.spawn(async move |this, cx| { - let result = async { - let mut messages = model.stream_completion(request, &cx).await?; - - let mut new_summary = String::new(); - while let Some(event) = messages.next().await { - let Ok(event) = event else { - continue; - }; - let text = match event { - LanguageModelCompletionEvent::Text(text) => text, - LanguageModelCompletionEvent::StatusUpdate( - CompletionRequestStatus::UsageUpdated { .. }, - ) => { - // this.update(cx, |thread, cx| { - // thread.update_model_request_usage(amount as u32, limit, cx); - // })?; - // todo!()? not sure if this is the right place to do this. + let task = cx + .spawn(async move |this, cx| { + let result: anyhow::Result = async { + let mut messages = model.stream_completion(request, &cx).await?; + + let mut new_summary = String::new(); + while let Some(event) = messages.next().await { + let Ok(event) = event else { continue; - } - _ => continue, - }; + }; + let text = match event { + LanguageModelCompletionEvent::Text(text) => text, + LanguageModelCompletionEvent::StatusUpdate( + CompletionRequestStatus::UsageUpdated { .. }, + ) => { + // this.update(cx, |thread, cx| { + // thread.update_model_request_usage(amount as u32, limit, cx); + // })?; + // todo!()? not sure if this is the right place to do this. + continue; + } + _ => continue, + }; - let mut lines = text.lines(); - new_summary.extend(lines.next()); + let mut lines = text.lines(); + new_summary.extend(lines.next()); - // Stop if the LLM generated multiple lines. - if lines.next().is_some() { - break; + // Stop if the LLM generated multiple lines. + if lines.next().is_some() { + break; + } } - } - anyhow::Ok(new_summary.into()) - } - .await; + anyhow::Ok(new_summary.into()) + } + .await; - this.update(cx, |this, cx| { - this.title = ThreadTitle::Done(result); - cx.notify(); + this.update(cx, |this, cx| { + if let Ok(title) = &result { + event_stream.send_title_update(title.clone()); + } + this.title = ThreadTitle::Done(result); + cx.notify(); + }) + .log_err(); }) - .log_err(); - }); + .shared(); - self.title = ThreadTitle::Pending(task); + self.title = ThreadTitle::Pending(task.clone()); cx.notify() } @@ -1746,6 +1760,12 @@ impl ThreadEventStream { .ok(); } + fn send_title_update(&self, text: SharedString) { + self.0 + .unbounded_send(Ok(ThreadEvent::TitleUpdate(text))) + .ok(); + } + fn send_thinking(&self, text: &str) { self.0 .unbounded_send(Ok(ThreadEvent::AgentThinking(text.to_string())))