Re-wire history

Conrad Irwin created

Change summary

crates/acp_thread/src/acp_thread.rs       |   1 
crates/acp_thread/src/connection.rs       |  36 +-
crates/agent2/src/agent.rs                | 261 ++++++++++++------------
crates/agent2/src/db.rs                   |  28 ++
crates/agent2/src/history_store.rs        |  96 +++++++--
crates/agent_ui/src/acp/thread_history.rs |  33 --
crates/agent_ui/src/acp/thread_view.rs    |  47 +++
crates/agent_ui/src/agent_panel.rs        |   2 
8 files changed, 294 insertions(+), 210 deletions(-)

Detailed changes

crates/acp_thread/src/acp_thread.rs 🔗

@@ -937,7 +937,6 @@ impl AcpThread {
     }
 
     pub fn update_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Result<()> {
-        dbg!("update title", &title);
         self.title = title;
         cx.emit(AcpThreadEvent::TitleUpdated);
         Ok(())

crates/acp_thread/src/connection.rs 🔗

@@ -2,6 +2,7 @@ use crate::{AcpThread, AcpThreadMetadata};
 use agent_client_protocol::{self as acp};
 use anyhow::Result;
 use collections::IndexMap;
+use futures::channel::mpsc::UnboundedReceiver;
 use gpui::{Entity, SharedString, Task};
 use project::Project;
 use serde::{Deserialize, Serialize};
@@ -26,25 +27,6 @@ pub trait AgentConnection {
         cx: &mut App,
     ) -> Task<Result<Entity<AcpThread>>>;
 
-    // todo!(expose a history trait, and include list_threads and load_thread)
-    // todo!(write a test)
-    fn list_threads(
-        &self,
-        _cx: &mut App,
-    ) -> Option<watch::Receiver<Option<Vec<AcpThreadMetadata>>>> {
-        return None;
-    }
-
-    fn load_thread(
-        self: Rc<Self>,
-        _project: Entity<Project>,
-        _cwd: &Path,
-        _session_id: acp::SessionId,
-        _cx: &mut App,
-    ) -> Task<Result<Entity<AcpThread>>> {
-        Task::ready(Err(anyhow::anyhow!("load thread not implemented")))
-    }
-
     fn auth_methods(&self) -> &[acp::AuthMethod];
 
     fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>>;
@@ -82,6 +64,10 @@ pub trait AgentConnection {
         None
     }
 
+    fn history(self: Rc<Self>) -> Option<Rc<dyn AgentHistory>> {
+        None
+    }
+
     fn into_any(self: Rc<Self>) -> Rc<dyn Any>;
 }
 
@@ -99,6 +85,18 @@ pub trait AgentSessionResume {
     fn run(&self, cx: &mut App) -> Task<Result<acp::PromptResponse>>;
 }
 
+pub trait AgentHistory {
+    fn list_threads(&self, cx: &mut App) -> Task<Result<Vec<AcpThreadMetadata>>>;
+    fn observe_history(&self, cx: &mut App) -> UnboundedReceiver<AcpThreadMetadata>;
+    fn load_thread(
+        self: Rc<Self>,
+        _project: Entity<Project>,
+        _cwd: &Path,
+        _session_id: acp::SessionId,
+        _cx: &mut App,
+    ) -> Task<Result<Entity<AcpThread>>>;
+}
+
 #[derive(Debug)]
 pub struct AuthRequired;
 

crates/agent2/src/agent.rs 🔗

@@ -6,7 +6,7 @@ use crate::{
     UserMessageContent, WebSearchTool, templates::Templates,
 };
 use crate::{ThreadsDatabase, generate_session_id};
-use acp_thread::{AcpThread, AcpThreadMetadata, AgentModelSelector};
+use acp_thread::{AcpThread, AcpThreadMetadata, AgentHistory, AgentModelSelector};
 use agent_client_protocol as acp;
 use agent_settings::AgentSettings;
 use anyhow::{Context as _, Result, anyhow};
@@ -56,7 +56,7 @@ struct Session {
     thread: Entity<Thread>,
     /// The ACP thread that handles protocol communication
     acp_thread: WeakEntity<acp_thread::AcpThread>,
-    save_task: Task<Result<()>>,
+    save_task: Task<()>,
     _subscriptions: Vec<Subscription>,
 }
 
@@ -173,8 +173,7 @@ pub struct NativeAgent {
     project: Entity<Project>,
     prompt_store: Option<Entity<PromptStore>>,
     thread_database: Arc<ThreadsDatabase>,
-    history: watch::Sender<Option<Vec<AcpThreadMetadata>>>,
-    load_history: Task<()>,
+    history_watchers: Vec<mpsc::UnboundedSender<AcpThreadMetadata>>,
     fs: Arc<dyn Fs>,
     _subscriptions: Vec<Subscription>,
 }
@@ -212,7 +211,7 @@ impl NativeAgent {
 
             let (project_context_needs_refresh_tx, project_context_needs_refresh_rx) =
                 watch::channel(());
-            let mut this = Self {
+            Self {
                 sessions: HashMap::new(),
                 project_context: Rc::new(RefCell::new(project_context)),
                 project_context_needs_refresh: project_context_needs_refresh_tx,
@@ -228,12 +227,9 @@ impl NativeAgent {
                 project,
                 prompt_store,
                 fs,
-                history: watch::channel(None).0,
-                load_history: Task::ready(()),
+                history_watchers: Vec::new(),
                 _subscriptions: subscriptions,
-            };
-            this.reload_history(cx);
-            this
+            }
         })
     }
 
@@ -250,7 +246,7 @@ impl NativeAgent {
             Session {
                 thread: thread.clone(),
                 acp_thread: weak_thread.clone(),
-                save_task: Task::ready(Ok(())),
+                save_task: Task::ready(()),
                 _subscriptions: vec![
                     cx.observe_release(&acp_thread, |this, acp_thread, _cx| {
                         this.sessions.remove(acp_thread.session_id());
@@ -285,35 +281,23 @@ impl NativeAgent {
         session.save_task = cx.spawn(async move |this, cx| {
             cx.background_executor().timer(SAVE_THREAD_DEBOUNCE).await;
 
-            let db_thread = thread.update(cx, |thread, cx| thread.to_db(cx))?.await;
-            thread_database.save_thread(id, db_thread).await?;
-            this.update(cx, |this, cx| this.reload_history(cx))?;
-            Ok(())
-        });
-    }
-
-    fn reload_history(&mut self, cx: &mut Context<Self>) {
-        let thread_database = self.thread_database.clone();
-        self.load_history = cx.spawn(async move |this, cx| {
-            let results = cx
-                .background_spawn(async move {
-                    let results = thread_database.list_threads().await?;
-                    anyhow::Ok(
-                        results
-                            .into_iter()
-                            .map(|thread| AcpThreadMetadata {
-                                agent: NATIVE_AGENT_SERVER_NAME.clone(),
-                                id: thread.id.into(),
-                                title: thread.title,
-                                updated_at: thread.updated_at,
-                            })
-                            .collect(),
-                    )
+            let Some(task) = thread.update(cx, |thread, cx| thread.to_db(cx)).ok() else {
+                return;
+            };
+            let db_thread = task.await;
+            let metadata = thread_database
+                .save_thread(id.clone(), db_thread)
+                .await
+                .log_err();
+            if let Some(metadata) = metadata {
+                this.update(cx, |this, _| {
+                    for watcher in this.history_watchers.iter_mut() {
+                        watcher
+                            .unbounded_send(metadata.clone().to_acp(NATIVE_AGENT_SERVER_NAME))
+                            .log_err();
+                    }
                 })
-                .await;
-            if let Some(results) = results.log_err() {
-                this.update(cx, |this, _| this.history.send(Some(results)))
-                    .ok();
+                .ok();
             }
         });
     }
@@ -667,7 +651,6 @@ impl NativeAgentConnection {
                                 })??;
                             }
                             ThreadEvent::TitleUpdate(title) => {
-                                dbg!("updating title");
                                 acp_thread
                                     .update(cx, |thread, cx| thread.update_title(title, cx))??;
                             }
@@ -884,11 +867,106 @@ impl acp_thread::AgentConnection for NativeAgentConnection {
         Task::ready(Ok(()))
     }
 
-    fn list_threads(
+    fn model_selector(&self) -> Option<Rc<dyn AgentModelSelector>> {
+        Some(Rc::new(self.clone()) as Rc<dyn AgentModelSelector>)
+    }
+
+    fn prompt(
+        &self,
+        id: Option<acp_thread::UserMessageId>,
+        params: acp::PromptRequest,
+        cx: &mut App,
+    ) -> Task<Result<acp::PromptResponse>> {
+        let id = id.expect("UserMessageId is required");
+        let session_id = params.session_id.clone();
+        log::info!("Received prompt request for session: {}", session_id);
+        log::debug!("Prompt blocks count: {}", params.prompt.len());
+
+        self.run_turn(session_id, cx, |thread, cx| {
+            let content: Vec<UserMessageContent> = params
+                .prompt
+                .into_iter()
+                .map(Into::into)
+                .collect::<Vec<_>>();
+            log::info!("Converted prompt to message: {} chars", content.len());
+            log::debug!("Message id: {:?}", id);
+            log::debug!("Message content: {:?}", content);
+
+            thread.update(cx, |thread, cx| thread.send(id, content, cx))
+        })
+    }
+
+    fn resume(
+        &self,
+        session_id: &acp::SessionId,
+        _cx: &mut App,
+    ) -> Option<Rc<dyn acp_thread::AgentSessionResume>> {
+        Some(Rc::new(NativeAgentSessionResume {
+            connection: self.clone(),
+            session_id: session_id.clone(),
+        }) as _)
+    }
+
+    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
+        log::info!("Cancelling on session: {}", session_id);
+        self.0.update(cx, |agent, cx| {
+            if let Some(agent) = agent.sessions.get(session_id) {
+                agent.thread.update(cx, |thread, cx| thread.cancel(cx));
+            }
+        });
+    }
+
+    fn session_editor(
         &self,
+        session_id: &agent_client_protocol::SessionId,
         cx: &mut App,
-    ) -> Option<watch::Receiver<Option<Vec<AcpThreadMetadata>>>> {
-        Some(self.0.read(cx).history.receiver())
+    ) -> Option<Rc<dyn acp_thread::AgentSessionEditor>> {
+        self.0.update(cx, |agent, _cx| {
+            agent
+                .sessions
+                .get(session_id)
+                .map(|session| Rc::new(NativeAgentSessionEditor(session.thread.clone())) as _)
+        })
+    }
+
+    fn history(self: Rc<Self>) -> Option<Rc<dyn AgentHistory>> {
+        Some(self)
+    }
+
+    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+        self
+    }
+}
+
+struct NativeAgentSessionEditor(Entity<Thread>);
+
+impl acp_thread::AgentSessionEditor for NativeAgentSessionEditor {
+    fn truncate(&self, message_id: acp_thread::UserMessageId, cx: &mut App) -> Task<Result<()>> {
+        Task::ready(
+            self.0
+                .update(cx, |thread, cx| thread.truncate(message_id, cx)),
+        )
+    }
+}
+
+impl acp_thread::AgentHistory for NativeAgentConnection {
+    fn list_threads(&self, cx: &mut App) -> Task<Result<Vec<AcpThreadMetadata>>> {
+        let database = self.0.read(cx).thread_database.clone();
+        cx.background_executor().spawn(async move {
+            let threads = database.list_threads().await?;
+            anyhow::Ok(
+                threads
+                    .into_iter()
+                    .map(|thread| thread.to_acp(NATIVE_AGENT_SERVER_NAME))
+                    .collect::<Vec<_>>(),
+            )
+        })
+    }
+
+    fn observe_history(&self, cx: &mut App) -> mpsc::UnboundedReceiver<AcpThreadMetadata> {
+        let (tx, rx) = mpsc::unbounded();
+        self.0.update(cx, |this, _| this.history_watchers.push(tx));
+        rx
     }
 
     fn load_thread(
@@ -980,83 +1058,6 @@ impl acp_thread::AgentConnection for NativeAgentConnection {
             Ok(acp_thread)
         })
     }
-
-    fn model_selector(&self) -> Option<Rc<dyn AgentModelSelector>> {
-        Some(Rc::new(self.clone()) as Rc<dyn AgentModelSelector>)
-    }
-
-    fn prompt(
-        &self,
-        id: Option<acp_thread::UserMessageId>,
-        params: acp::PromptRequest,
-        cx: &mut App,
-    ) -> Task<Result<acp::PromptResponse>> {
-        let id = id.expect("UserMessageId is required");
-        let session_id = params.session_id.clone();
-        log::info!("Received prompt request for session: {}", session_id);
-        log::debug!("Prompt blocks count: {}", params.prompt.len());
-
-        self.run_turn(session_id, cx, |thread, cx| {
-            let content: Vec<UserMessageContent> = params
-                .prompt
-                .into_iter()
-                .map(Into::into)
-                .collect::<Vec<_>>();
-            log::info!("Converted prompt to message: {} chars", content.len());
-            log::debug!("Message id: {:?}", id);
-            log::debug!("Message content: {:?}", content);
-
-            thread.update(cx, |thread, cx| thread.send(id, content, cx))
-        })
-    }
-
-    fn resume(
-        &self,
-        session_id: &acp::SessionId,
-        _cx: &mut App,
-    ) -> Option<Rc<dyn acp_thread::AgentSessionResume>> {
-        Some(Rc::new(NativeAgentSessionResume {
-            connection: self.clone(),
-            session_id: session_id.clone(),
-        }) as _)
-    }
-
-    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
-        log::info!("Cancelling on session: {}", session_id);
-        self.0.update(cx, |agent, cx| {
-            if let Some(agent) = agent.sessions.get(session_id) {
-                agent.thread.update(cx, |thread, cx| thread.cancel(cx));
-            }
-        });
-    }
-
-    fn session_editor(
-        &self,
-        session_id: &agent_client_protocol::SessionId,
-        cx: &mut App,
-    ) -> Option<Rc<dyn acp_thread::AgentSessionEditor>> {
-        self.0.update(cx, |agent, _cx| {
-            agent
-                .sessions
-                .get(session_id)
-                .map(|session| Rc::new(NativeAgentSessionEditor(session.thread.clone())) as _)
-        })
-    }
-
-    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
-        self
-    }
-}
-
-struct NativeAgentSessionEditor(Entity<Thread>);
-
-impl acp_thread::AgentSessionEditor for NativeAgentSessionEditor {
-    fn truncate(&self, message_id: acp_thread::UserMessageId, cx: &mut App) -> Task<Result<()>> {
-        Task::ready(
-            self.0
-                .update(cx, |thread, cx| thread.truncate(message_id, cx)),
-        )
-    }
 }
 
 struct NativeAgentSessionResume {
@@ -1274,16 +1275,22 @@ mod tests {
         )
         .await
         .unwrap();
-        let connection = NativeAgentConnection(agent.clone());
-        let history_store = cx.new(|cx| {
-            let mut store = HistoryStore::new(cx);
-            store.register_agent(NATIVE_AGENT_SERVER_NAME.clone(), &connection, cx);
-            store
-        });
+        let connection = Rc::new(NativeAgentConnection(agent.clone()));
+        let history = connection.clone().history().unwrap();
+        let history_store = cx.new(|cx| HistoryStore::get_or_init(cx));
+
+        history_store
+            .update(cx, |history_store, cx| {
+                history_store.load_history(NATIVE_AGENT_SERVER_NAME.clone(), history.as_ref(), cx)
+            })
+            .await
+            .unwrap();
 
         let acp_thread = cx
             .update(|cx| {
-                Rc::new(connection.clone()).new_thread(project.clone(), Path::new(path!("")), cx)
+                connection
+                    .clone()
+                    .new_thread(project.clone(), Path::new(path!("")), cx)
             })
             .await
             .unwrap();

crates/agent2/src/db.rs 🔗

@@ -1,4 +1,5 @@
 use crate::{AgentMessage, AgentMessageContent, UserMessage, UserMessageContent};
+use acp_thread::{AcpThreadMetadata, AgentServerName};
 use agent::thread_store;
 use agent_client_protocol as acp;
 use agent_settings::{AgentProfileId, CompletionMode};
@@ -30,6 +31,17 @@ pub struct DbThreadMetadata {
     pub updated_at: DateTime<Utc>,
 }
 
+impl DbThreadMetadata {
+    pub fn to_acp(self, agent: AgentServerName) -> AcpThreadMetadata {
+        AcpThreadMetadata {
+            agent,
+            id: self.id,
+            title: self.title,
+            updated_at: self.updated_at,
+        }
+    }
+}
+
 #[derive(Debug, Serialize, Deserialize)]
 pub struct DbThread {
     pub title: SharedString,
@@ -288,7 +300,7 @@ impl ThreadsDatabase {
         connection: &Arc<Mutex<Connection>>,
         id: acp::SessionId,
         thread: DbThread,
-    ) -> Result<()> {
+    ) -> Result<DbThreadMetadata> {
         let json_data = serde_json::to_string(&thread)?;
         let title = thread.title.to_string();
         let updated_at = thread.updated_at.to_rfc3339();
@@ -303,9 +315,13 @@ impl ThreadsDatabase {
             INSERT OR REPLACE INTO threads (id, summary, updated_at, data_type, data) VALUES (?, ?, ?, ?, ?)
         "})?;
 
-        insert((id.0, title, updated_at, data_type, data))?;
+        insert((id.0.clone(), title, updated_at, data_type, data))?;
 
-        Ok(())
+        Ok(DbThreadMetadata {
+            id,
+            title: thread.title,
+            updated_at: thread.updated_at,
+        })
     }
 
     pub fn list_threads(&self) -> Task<Result<Vec<DbThreadMetadata>>> {
@@ -360,7 +376,11 @@ impl ThreadsDatabase {
         })
     }
 
-    pub fn save_thread(&self, id: acp::SessionId, thread: DbThread) -> Task<Result<()>> {
+    pub fn save_thread(
+        &self,
+        id: acp::SessionId,
+        thread: DbThread,
+    ) -> Task<Result<DbThreadMetadata>> {
         let connection = self.connection.clone();
 
         self.executor

crates/agent2/src/history_store.rs 🔗

@@ -1,12 +1,17 @@
 use acp_thread::{AcpThreadMetadata, AgentConnection, AgentServerName};
 use agent_client_protocol as acp;
+use agent_servers::AgentServer;
 use assistant_context::SavedContextMetadata;
 use chrono::{DateTime, Utc};
 use collections::HashMap;
-use gpui::{SharedString, Task, prelude::*};
+use gpui::{Entity, Global, SharedString, Task, prelude::*};
+use project::Project;
 use serde::{Deserialize, Serialize};
+use ui::App;
 
-use std::{path::Path, sync::Arc, time::Duration};
+use std::{path::Path, rc::Rc, sync::Arc, time::Duration};
+
+use crate::NativeAgentServer;
 
 const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
 const NAVIGATION_HISTORY_PATH: &str = "agent-navigation-history.json";
@@ -59,41 +64,88 @@ enum SerializedRecentOpen {
     Context(String),
 }
 
+#[derive(Default)]
 pub struct AgentHistory {
-    entries: watch::Receiver<Option<Vec<AcpThreadMetadata>>>,
-    _task: Task<()>,
+    entries: HashMap<acp::SessionId, AcpThreadMetadata>,
+    loaded: bool,
 }
 
 pub struct HistoryStore {
     agents: HashMap<AgentServerName, AgentHistory>, // todo!() text threads
 }
+// note, we have to share the history store between all windows
+// because we only get updates from one connection at a time.
+struct GlobalHistoryStore(Entity<HistoryStore>);
+impl Global for GlobalHistoryStore {}
 
 impl HistoryStore {
-    pub fn new(_cx: &mut Context<Self>) -> Self {
+    pub fn get_or_init(project: &Entity<Project>, cx: &mut App) -> Entity<Self> {
+        if cx.has_global::<GlobalHistoryStore>() {
+            return cx.global::<GlobalHistoryStore>().0.clone();
+        }
+        let history_store = cx.new(|cx| HistoryStore::new(cx));
+        cx.set_global(GlobalHistoryStore(history_store.clone()));
+        let root_dir = project
+            .read(cx)
+            .visible_worktrees(cx)
+            .next()
+            .map(|worktree| worktree.read(cx).abs_path())
+            .unwrap_or_else(|| paths::home_dir().as_path().into());
+
+        let agent = NativeAgentServer::new(project.read(cx).fs().clone());
+        let connect = agent.connect(&root_dir, project, cx);
+        cx.spawn({
+            let history_store = history_store.clone();
+            async move |cx| {
+                let connection = connect.await?.history().unwrap();
+                history_store
+                    .update(cx, |history_store, cx| {
+                        history_store.load_history(agent.name(), connection.as_ref(), cx)
+                    })?
+                    .await
+            }
+        })
+        .detach_and_log_err(cx);
+        history_store
+    }
+
+    fn new(_cx: &mut Context<Self>) -> Self {
         Self {
             agents: HashMap::default(),
         }
     }
 
-    pub fn register_agent(
+    pub fn update_history(&mut self, entry: AcpThreadMetadata, cx: &mut Context<Self>) {
+        let agent = self
+            .agents
+            .entry(entry.agent.clone())
+            .or_insert(Default::default());
+
+        agent.entries.insert(entry.id.clone(), entry);
+        cx.notify()
+    }
+
+    pub fn load_history(
         &mut self,
         agent_name: AgentServerName,
-        connection: &dyn AgentConnection,
+        connection: &dyn acp_thread::AgentHistory,
         cx: &mut Context<Self>,
-    ) {
-        let Some(mut history) = connection.list_threads(cx) else {
-            return;
-        };
-        let history = AgentHistory {
-            entries: history.clone(),
-            _task: cx.spawn(async move |this, cx| {
-                dbg!("loaded", history.borrow().as_ref().map(|b| b.len()));
-                while history.changed().await.is_ok() {
-                    this.update(cx, |_, cx| cx.notify()).ok();
-                }
-            }),
-        };
-        self.agents.insert(agent_name.clone(), history);
+    ) -> Task<anyhow::Result<()>> {
+        let threads = connection.list_threads(cx);
+        cx.spawn(async move |this, cx| {
+            let threads = threads.await?;
+
+            this.update(cx, |this, cx| {
+                this.agents.insert(
+                    agent_name,
+                    AgentHistory {
+                        loaded: true,
+                        entries: threads.into_iter().map(|t| (t.id.clone(), t)).collect(),
+                    },
+                );
+                cx.notify()
+            })
+        })
     }
 
     pub fn entries(&mut self, _cx: &mut Context<Self>) -> Vec<HistoryEntry> {
@@ -107,7 +159,7 @@ impl HistoryStore {
         history_entries.extend(
             self.agents
                 .values_mut()
-                .flat_map(|history| history.entries.borrow().clone().unwrap_or_default()) // todo!("surface the loading state?")
+                .flat_map(|history| history.entries.values().cloned()) // todo!("surface the loading state?")
                 .map(HistoryEntry::AcpThread),
         );
         // todo!() include the text threads in here.

crates/agent_ui/src/acp/thread_history.rs 🔗

@@ -5,8 +5,8 @@ use chrono::{Datelike as _, Local, NaiveDate, TimeDelta};
 use editor::{Editor, EditorEvent};
 use fuzzy::{StringMatch, StringMatchCandidate};
 use gpui::{
-    App, Empty, Entity, EventEmitter, FocusHandle, Focusable, ScrollStrategy, Stateful, Task,
-    UniformListScrollHandle, Window, uniform_list,
+    App, Empty, Entity, EventEmitter, FocusHandle, Focusable, Global, ScrollStrategy, Stateful,
+    Task, UniformListScrollHandle, Window, uniform_list,
 };
 use project::Project;
 use std::{fmt::Display, ops::Range, sync::Arc};
@@ -18,7 +18,7 @@ use ui::{
 use util::ResultExt;
 
 pub struct AcpThreadHistory {
-    history_store: Entity<HistoryStore>,
+    pub(crate) history_store: Entity<HistoryStore>,
     scroll_handle: UniformListScrollHandle,
     selected_index: usize,
     hovered_index: Option<usize>,
@@ -69,37 +69,12 @@ impl AcpThreadHistory {
         window: &mut Window,
         cx: &mut Context<Self>,
     ) -> Self {
-        let history_store = cx.new(|cx| agent2::HistoryStore::new(cx));
-
-        let agent = NativeAgentServer::new(project.read(cx).fs().clone());
-
-        let root_dir = project
-            .read(cx)
-            .visible_worktrees(cx)
-            .next()
-            .map(|worktree| worktree.read(cx).abs_path())
-            .unwrap_or_else(|| paths::home_dir().as_path().into());
-
-        // todo!() reuse this connection for sending messages
-        let connect = agent.connect(&root_dir, project, cx);
-        cx.spawn(async move |this, cx| {
-            let connection = connect.await?;
-            this.update(cx, |this, cx| {
-                this.history_store.update(cx, |this, cx| {
-                    this.register_agent(agent.name(), connection.as_ref(), cx)
-                })
-            })?;
-            // todo!() we must keep it alive
-            std::mem::forget(connection);
-            anyhow::Ok(())
-        })
-        .detach();
-
         let search_editor = cx.new(|cx| {
             let mut editor = Editor::single_line(window, cx);
             editor.set_placeholder_text("Search threads...", cx);
             editor
         });
+        let history_store = HistoryStore::get_or_init(project, cx);
 
         let search_editor_subscription =
             cx.subscribe(&search_editor, |this, search_editor, event, cx| {

crates/agent_ui/src/acp/thread_view.rs 🔗

@@ -18,6 +18,7 @@ use editor::scroll::Autoscroll;
 use editor::{Editor, EditorMode, MultiBuffer, PathKey, SelectionEffects};
 use file_icons::FileIcons;
 use fs::Fs;
+use futures::StreamExt;
 use gpui::{
     Action, Animation, AnimationExt, App, BorderStyle, ClickEvent, ClipboardItem, EdgesRefinement,
     Empty, Entity, FocusHandle, Focusable, Hsla, Length, ListOffset, ListState, MouseButton,
@@ -123,6 +124,7 @@ pub struct AcpThreadView {
     editor_expanded: bool,
     terminal_expanded: bool,
     editing_message: Option<usize>,
+    history_store: Entity<agent2::HistoryStore>,
     _cancel_task: Option<Task<()>>,
     _subscriptions: [Subscription; 3],
 }
@@ -134,6 +136,7 @@ enum ThreadState {
     Ready {
         thread: Entity<AcpThread>,
         _subscription: [Subscription; 2],
+        _history_task: Option<Task<()>>,
     },
     LoadError(LoadError),
     Unauthenticated {
@@ -149,6 +152,7 @@ impl AcpThreadView {
         agent: Rc<dyn AgentServer>,
         workspace: WeakEntity<Workspace>,
         project: Entity<Project>,
+        history_store: Entity<agent2::HistoryStore>,
         thread_store: Entity<ThreadStore>,
         text_thread_store: Entity<TextThreadStore>,
         restore_thread: Option<AcpThreadMetadata>,
@@ -196,11 +200,13 @@ impl AcpThreadView {
             thread_state: Self::initial_state(
                 agent,
                 restore_thread,
+                history_store.clone(),
                 workspace,
                 project,
                 window,
                 cx,
             ),
+            history_store,
             message_editor,
             model_selector: None,
             profile_selector: None,
@@ -225,6 +231,7 @@ impl AcpThreadView {
     fn initial_state(
         agent: Rc<dyn AgentServer>,
         restore_thread: Option<AcpThreadMetadata>,
+        history_store: Entity<agent2::HistoryStore>,
         workspace: WeakEntity<Workspace>,
         project: Entity<Project>,
         window: &mut Window,
@@ -251,6 +258,25 @@ impl AcpThreadView {
                 }
             };
 
+            let mut history_task = None;
+            let history = connection.clone().history();
+            if let Some(history) = history.clone() {
+                if let Some(mut history) = cx.update(|_, cx| history.observe_history(cx)).ok() {
+                    history_task = Some(cx.spawn(async move |cx| {
+                        while let Some(update) = history.next().await {
+                            if !history_store
+                                .update(cx, |history_store, cx| {
+                                    history_store.update_history(update, cx)
+                                })
+                                .is_ok()
+                            {
+                                break;
+                            }
+                        }
+                    }));
+                }
+            }
+
             // this.update_in(cx, |_this, _window, cx| {
             //     let status = connection.exit_status(cx);
             //     cx.spawn(async move |this, cx| {
@@ -264,15 +290,12 @@ impl AcpThreadView {
             //     .detach();
             // })
             // .ok();
-            //
+            let history = connection.clone().history();
             let task = cx.update(|_, cx| {
-                if let Some(restore_thread) = restore_thread {
-                    connection.clone().load_thread(
-                        project.clone(),
-                        &root_dir,
-                        restore_thread.id,
-                        cx,
-                    )
+                if let Some(restore_thread) = restore_thread
+                    && let Some(history) = history
+                {
+                    history.load_thread(project.clone(), &root_dir, restore_thread.id, cx)
                 } else {
                     connection
                         .clone()
@@ -342,6 +365,7 @@ impl AcpThreadView {
                         this.thread_state = ThreadState::Ready {
                             thread,
                             _subscription: [thread_subscription, action_log_subscription],
+                            _history_task: history_task,
                         };
 
                         this.profile_selector = this.as_native_thread(cx).map(|thread| {
@@ -751,6 +775,7 @@ impl AcpThreadView {
                         this.thread_state = Self::initial_state(
                             agent,
                             None, // todo!()
+                            this.history_store.clone(),
                             this.workspace.clone(),
                             project.clone(),
                             window,
@@ -3755,6 +3780,8 @@ pub(crate) mod tests {
             cx.update(|_window, cx| cx.new(|cx| ThreadStore::fake(project.clone(), cx)));
         let text_thread_store =
             cx.update(|_window, cx| cx.new(|cx| TextThreadStore::fake(project.clone(), cx)));
+        let history_store =
+            cx.update(|_window, cx| cx.new(|cx| agent2::HistoryStore::get_or_init(cx)));
 
         let thread_view = cx.update(|window, cx| {
             cx.new(|cx| {
@@ -3762,6 +3789,7 @@ pub(crate) mod tests {
                     Rc::new(agent),
                     workspace.downgrade(),
                     project,
+                    history_store.clone(),
                     thread_store.clone(),
                     text_thread_store.clone(),
                     None,
@@ -3954,6 +3982,8 @@ pub(crate) mod tests {
             cx.update(|_window, cx| cx.new(|cx| ThreadStore::fake(project.clone(), cx)));
         let text_thread_store =
             cx.update(|_window, cx| cx.new(|cx| TextThreadStore::fake(project.clone(), cx)));
+        let history_store =
+            cx.update(|_window, cx| cx.new(|cx| agent2::HistoryStore::get_or_init(cx)));
 
         let connection = Rc::new(StubAgentConnection::new());
         let thread_view = cx.update(|window, cx| {
@@ -3962,6 +3992,7 @@ pub(crate) mod tests {
                     Rc::new(StubAgentServer::new(connection.as_ref().clone())),
                     workspace.downgrade(),
                     project.clone(),
+                    history_store,
                     thread_store.clone(),
                     text_thread_store.clone(),
                     None,

crates/agent_ui/src/agent_panel.rs 🔗

@@ -1031,11 +1031,13 @@ impl AgentPanel {
             };
 
             this.update_in(cx, |this, window, cx| {
+                let acp_history_store = this.acp_history.read(cx).history_store.clone();
                 let thread_view = cx.new(|cx| {
                     crate::acp::AcpThreadView::new(
                         server,
                         workspace.clone(),
                         project,
+                        acp_history_store,
                         thread_store.clone(),
                         text_thread_store.clone(),
                         restore_thread,