Rebuild recently opened threads for ACP (#36531)

Conrad Irwin created

Closes #ISSUE

Release Notes:

- N/A

Change summary

Cargo.lock                             |   1 
crates/agent2/Cargo.toml               |   1 
crates/agent2/src/agent.rs             |   6 
crates/agent2/src/history_store.rs     | 102 ++++++++++++++-------------
crates/agent2/src/tests/mod.rs         |   2 
crates/agent_ui/src/acp/thread_view.rs |  39 +++++++++-
crates/agent_ui/src/agent_panel.rs     |  21 ++++-
7 files changed, 109 insertions(+), 63 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -206,6 +206,7 @@ dependencies = [
  "collections",
  "context_server",
  "ctor",
+ "db",
  "editor",
  "env_logger 0.11.8",
  "fs",

crates/agent2/Cargo.toml 🔗

@@ -26,6 +26,7 @@ chrono.workspace = true
 cloud_llm_client.workspace = true
 collections.workspace = true
 context_server.workspace = true
+db.workspace = true
 fs.workspace = true
 futures.workspace = true
 git.workspace = true

crates/agent2/src/agent.rs 🔗

@@ -974,7 +974,7 @@ mod tests {
         .await;
         let project = Project::test(fs.clone(), [], cx).await;
         let context_store = cx.new(|cx| assistant_context::ContextStore::fake(project.clone(), cx));
-        let history_store = cx.new(|cx| HistoryStore::new(context_store, [], cx));
+        let history_store = cx.new(|cx| HistoryStore::new(context_store, cx));
         let agent = NativeAgent::new(
             project.clone(),
             history_store,
@@ -1032,7 +1032,7 @@ mod tests {
         fs.insert_tree("/", json!({ "a": {}  })).await;
         let project = Project::test(fs.clone(), [], cx).await;
         let context_store = cx.new(|cx| assistant_context::ContextStore::fake(project.clone(), cx));
-        let history_store = cx.new(|cx| HistoryStore::new(context_store, [], cx));
+        let history_store = cx.new(|cx| HistoryStore::new(context_store, cx));
         let connection = NativeAgentConnection(
             NativeAgent::new(
                 project.clone(),
@@ -1088,7 +1088,7 @@ mod tests {
         let project = Project::test(fs.clone(), [], cx).await;
 
         let context_store = cx.new(|cx| assistant_context::ContextStore::fake(project.clone(), cx));
-        let history_store = cx.new(|cx| HistoryStore::new(context_store, [], cx));
+        let history_store = cx.new(|cx| HistoryStore::new(context_store, cx));
 
         // Create the agent and connection
         let agent = NativeAgent::new(

crates/agent2/src/history_store.rs 🔗

@@ -3,6 +3,7 @@ use agent_client_protocol as acp;
 use anyhow::{Context as _, Result, anyhow};
 use assistant_context::SavedContextMetadata;
 use chrono::{DateTime, Utc};
+use db::kvp::KEY_VALUE_STORE;
 use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
 use itertools::Itertools;
 use paths::contexts_dir;
@@ -11,7 +12,7 @@ use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
 use util::ResultExt as _;
 
 const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
-const NAVIGATION_HISTORY_PATH: &str = "agent-navigation-history.json";
+const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
 const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
 
 const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
@@ -53,12 +54,10 @@ pub enum HistoryEntryId {
     TextThread(Arc<Path>),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Debug)]
 enum SerializedRecentOpen {
-    Thread(String),
-    ContextName(String),
-    /// Old format which stores the full path
-    Context(String),
+    AcpThread(String),
+    TextThread(String),
 }
 
 pub struct HistoryStore {
@@ -72,29 +71,26 @@ pub struct HistoryStore {
 impl HistoryStore {
     pub fn new(
         context_store: Entity<assistant_context::ContextStore>,
-        initial_recent_entries: impl IntoIterator<Item = HistoryEntryId>,
         cx: &mut Context<Self>,
     ) -> Self {
         let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
 
         cx.spawn(async move |this, cx| {
-            let entries = Self::load_recently_opened_entries(cx).await.log_err()?;
-            this.update(cx, |this, _| {
-                this.recently_opened_entries
-                    .extend(
-                        entries.into_iter().take(
-                            MAX_RECENTLY_OPENED_ENTRIES
-                                .saturating_sub(this.recently_opened_entries.len()),
-                        ),
-                    );
+            let entries = Self::load_recently_opened_entries(cx).await;
+            this.update(cx, |this, cx| {
+                if let Some(entries) = entries.log_err() {
+                    this.recently_opened_entries = entries;
+                }
+
+                this.reload(cx);
             })
-            .ok()
+            .ok();
         })
         .detach();
 
         Self {
             context_store,
-            recently_opened_entries: initial_recent_entries.into_iter().collect(),
+            recently_opened_entries: VecDeque::default(),
             threads: Vec::default(),
             _subscriptions: subscriptions,
             _save_recently_opened_entries_task: Task::ready(()),
@@ -134,6 +130,18 @@ impl HistoryStore {
                 .await?;
 
             this.update(cx, |this, cx| {
+                if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
+                    for thread in threads
+                        .iter()
+                        .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
+                        .rev()
+                    {
+                        this.push_recently_opened_entry(
+                            HistoryEntryId::AcpThread(thread.id.clone()),
+                            cx,
+                        )
+                    }
+                }
                 this.threads = threads;
                 cx.notify();
             })
@@ -162,6 +170,16 @@ impl HistoryStore {
         history_entries
     }
 
+    pub fn is_empty(&self, cx: &App) -> bool {
+        self.threads.is_empty()
+            && self
+                .context_store
+                .read(cx)
+                .unordered_contexts()
+                .next()
+                .is_none()
+    }
+
     pub fn recent_entries(&self, limit: usize, cx: &mut Context<Self>) -> Vec<HistoryEntry> {
         self.entries(cx).into_iter().take(limit).collect()
     }
@@ -215,58 +233,44 @@ impl HistoryStore {
             .iter()
             .filter_map(|entry| match entry {
                 HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
-                    SerializedRecentOpen::ContextName(file.to_string_lossy().to_string())
+                    SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
                 }),
-                HistoryEntryId::AcpThread(id) => Some(SerializedRecentOpen::Thread(id.to_string())),
+                HistoryEntryId::AcpThread(id) => {
+                    Some(SerializedRecentOpen::AcpThread(id.to_string()))
+                }
             })
             .collect::<Vec<_>>();
 
         self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
+            let content = serde_json::to_string(&serialized_entries).unwrap();
             cx.background_executor()
                 .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
                 .await;
-            cx.background_spawn(async move {
-                let path = paths::data_dir().join(NAVIGATION_HISTORY_PATH);
-                let content = serde_json::to_string(&serialized_entries)?;
-                std::fs::write(path, content)?;
-                anyhow::Ok(())
-            })
-            .await
-            .log_err();
+            KEY_VALUE_STORE
+                .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
+                .await
+                .log_err();
         });
     }
 
-    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<Vec<HistoryEntryId>>> {
+    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
         cx.background_spawn(async move {
-            let path = paths::data_dir().join(NAVIGATION_HISTORY_PATH);
-            let contents = match smol::fs::read_to_string(path).await {
-                Ok(it) => it,
-                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
-                    return Ok(Vec::new());
-                }
-                Err(e) => {
-                    return Err(e)
-                        .context("deserializing persisted agent panel navigation history");
-                }
-            };
-            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&contents)
+            let json = KEY_VALUE_STORE
+                .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
+                .unwrap_or("[]".to_string());
+            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
                 .context("deserializing persisted agent panel navigation history")?
                 .into_iter()
                 .take(MAX_RECENTLY_OPENED_ENTRIES)
                 .flat_map(|entry| match entry {
-                    SerializedRecentOpen::Thread(id) => Some(HistoryEntryId::AcpThread(
+                    SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
                         acp::SessionId(id.as_str().into()),
                     )),
-                    SerializedRecentOpen::ContextName(file_name) => Some(
+                    SerializedRecentOpen::TextThread(file_name) => Some(
                         HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
                     ),
-                    SerializedRecentOpen::Context(path) => {
-                        Path::new(&path).file_name().map(|file_name| {
-                            HistoryEntryId::TextThread(contexts_dir().join(file_name).into())
-                        })
-                    }
                 })
-                .collect::<Vec<_>>();
+                .collect();
             Ok(entries)
         })
     }

crates/agent2/src/tests/mod.rs 🔗

@@ -1414,7 +1414,7 @@ async fn test_agent_connection(cx: &mut TestAppContext) {
     let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
     let cwd = Path::new("/test");
     let context_store = cx.new(|cx| assistant_context::ContextStore::fake(project.clone(), cx));
-    let history_store = cx.new(|cx| HistoryStore::new(context_store, [], cx));
+    let history_store = cx.new(|cx| HistoryStore::new(context_store, cx));
 
     // Create agent and connection
     let agent = NativeAgent::new(

crates/agent_ui/src/acp/thread_view.rs 🔗

@@ -9,7 +9,7 @@ use agent::{TextThreadStore, ThreadStore};
 use agent_client_protocol::{self as acp};
 use agent_servers::{AgentServer, ClaudeCode};
 use agent_settings::{AgentProfileId, AgentSettings, CompletionMode, NotifyWhenAgentWaiting};
-use agent2::DbThreadMetadata;
+use agent2::{DbThreadMetadata, HistoryEntryId, HistoryStore};
 use anyhow::bail;
 use audio::{Audio, Sound};
 use buffer_diff::BufferDiff;
@@ -111,6 +111,7 @@ pub struct AcpThreadView {
     workspace: WeakEntity<Workspace>,
     project: Entity<Project>,
     thread_state: ThreadState,
+    history_store: Entity<HistoryStore>,
     entry_view_state: Entity<EntryViewState>,
     message_editor: Entity<MessageEditor>,
     model_selector: Option<Entity<AcpModelSelectorPopover>>,
@@ -159,6 +160,7 @@ impl AcpThreadView {
         resume_thread: Option<DbThreadMetadata>,
         workspace: WeakEntity<Workspace>,
         project: Entity<Project>,
+        history_store: Entity<HistoryStore>,
         thread_store: Entity<ThreadStore>,
         text_thread_store: Entity<TextThreadStore>,
         window: &mut Window,
@@ -223,6 +225,7 @@ impl AcpThreadView {
             plan_expanded: false,
             editor_expanded: false,
             terminal_expanded: true,
+            history_store,
             _subscriptions: subscriptions,
             _cancel_task: None,
         }
@@ -260,7 +263,7 @@ impl AcpThreadView {
             let result = if let Some(native_agent) = connection
                 .clone()
                 .downcast::<agent2::NativeAgentConnection>()
-                && let Some(resume) = resume_thread
+                && let Some(resume) = resume_thread.clone()
             {
                 cx.update(|_, cx| {
                     native_agent
@@ -313,6 +316,15 @@ impl AcpThreadView {
                             }
                         });
 
+                        if let Some(resume) = resume_thread {
+                            this.history_store.update(cx, |history, cx| {
+                                history.push_recently_opened_entry(
+                                    HistoryEntryId::AcpThread(resume.id),
+                                    cx,
+                                );
+                            });
+                        }
+
                         AgentDiff::set_active_thread(&workspace, thread.clone(), window, cx);
 
                         this.model_selector =
@@ -555,9 +567,15 @@ impl AcpThreadView {
     }
 
     fn send(&mut self, window: &mut Window, cx: &mut Context<Self>) {
-        if let Some(thread) = self.thread()
-            && thread.read(cx).status() != ThreadStatus::Idle
-        {
+        let Some(thread) = self.thread() else { return };
+        self.history_store.update(cx, |history, cx| {
+            history.push_recently_opened_entry(
+                HistoryEntryId::AcpThread(thread.read(cx).session_id().clone()),
+                cx,
+            );
+        });
+
+        if thread.read(cx).status() != ThreadStatus::Idle {
             self.stop_current_and_send_new_message(window, cx);
             return;
         }
@@ -3942,6 +3960,7 @@ pub(crate) mod tests {
     use acp_thread::StubAgentConnection;
     use agent::{TextThreadStore, ThreadStore};
     use agent_client_protocol::SessionId;
+    use assistant_context::ContextStore;
     use editor::EditorSettings;
     use fs::FakeFs;
     use gpui::{EventEmitter, SemanticVersion, TestAppContext, VisualTestContext};
@@ -4079,6 +4098,10 @@ pub(crate) mod tests {
             cx.update(|_window, cx| cx.new(|cx| ThreadStore::fake(project.clone(), cx)));
         let text_thread_store =
             cx.update(|_window, cx| cx.new(|cx| TextThreadStore::fake(project.clone(), cx)));
+        let context_store =
+            cx.update(|_window, cx| cx.new(|cx| ContextStore::fake(project.clone(), cx)));
+        let history_store =
+            cx.update(|_window, cx| cx.new(|cx| HistoryStore::new(context_store, cx)));
 
         let thread_view = cx.update(|window, cx| {
             cx.new(|cx| {
@@ -4087,6 +4110,7 @@ pub(crate) mod tests {
                     None,
                     workspace.downgrade(),
                     project,
+                    history_store,
                     thread_store.clone(),
                     text_thread_store.clone(),
                     window,
@@ -4283,6 +4307,10 @@ pub(crate) mod tests {
             cx.update(|_window, cx| cx.new(|cx| ThreadStore::fake(project.clone(), cx)));
         let text_thread_store =
             cx.update(|_window, cx| cx.new(|cx| TextThreadStore::fake(project.clone(), cx)));
+        let context_store =
+            cx.update(|_window, cx| cx.new(|cx| ContextStore::fake(project.clone(), cx)));
+        let history_store =
+            cx.update(|_window, cx| cx.new(|cx| HistoryStore::new(context_store, cx)));
 
         let connection = Rc::new(StubAgentConnection::new());
         let thread_view = cx.update(|window, cx| {
@@ -4292,6 +4320,7 @@ pub(crate) mod tests {
                     None,
                     workspace.downgrade(),
                     project.clone(),
+                    history_store.clone(),
                     thread_store.clone(),
                     text_thread_store.clone(),
                     window,

crates/agent_ui/src/agent_panel.rs 🔗

@@ -648,8 +648,7 @@ impl AgentPanel {
             )
         });
 
-        let acp_history_store =
-            cx.new(|cx| agent2::HistoryStore::new(context_store.clone(), [], cx));
+        let acp_history_store = cx.new(|cx| agent2::HistoryStore::new(context_store.clone(), cx));
         let acp_history = cx.new(|cx| AcpThreadHistory::new(acp_history_store.clone(), window, cx));
         cx.subscribe_in(
             &acp_history,
@@ -1073,6 +1072,7 @@ impl AgentPanel {
                         resume_thread,
                         workspace.clone(),
                         project,
+                        this.acp_history_store.clone(),
                         thread_store.clone(),
                         text_thread_store.clone(),
                         window,
@@ -1609,6 +1609,14 @@ impl AgentPanel {
                     if let Some(path) = context_editor.read(cx).context().read(cx).path() {
                         store.push_recently_opened_entry(HistoryEntryId::Context(path.clone()), cx)
                     }
+                });
+                self.acp_history_store.update(cx, |store, cx| {
+                    if let Some(path) = context_editor.read(cx).context().read(cx).path() {
+                        store.push_recently_opened_entry(
+                            agent2::HistoryEntryId::TextThread(path.clone()),
+                            cx,
+                        )
+                    }
                 })
             }
             ActiveView::ExternalAgentThread { .. } => {}
@@ -2763,9 +2771,12 @@ impl AgentPanel {
                 false
             }
             _ => {
-                let history_is_empty = self
-                    .history_store
-                    .update(cx, |store, cx| store.recent_entries(1, cx).is_empty());
+                let history_is_empty = if cx.has_flag::<AcpFeatureFlag>() {
+                    self.acp_history_store.read(cx).is_empty(cx)
+                } else {
+                    self.history_store
+                        .update(cx, |store, cx| store.recent_entries(1, cx).is_empty())
+                };
 
                 let has_configured_non_zed_providers = LanguageModelRegistry::read_global(cx)
                     .providers()