diff --git a/crates/agent_servers/src/agent_servers.rs b/crates/agent_servers/src/agent_servers.rs index adcbd923c2aecbd88e71037591687acda9f57fac..a12b63164325cfc447e44b3a5899e79b774e141f 100644 --- a/crates/agent_servers/src/agent_servers.rs +++ b/crates/agent_servers/src/agent_servers.rs @@ -9,7 +9,6 @@ use collections::{HashMap, HashSet}; pub use custom::*; use fs::Fs; use http_client::read_no_proxy_from_env; -use project::Project; use project::agent_server_store::AgentServerStore; use acp_thread::AgentConnection; @@ -22,29 +21,19 @@ pub use acp::AcpConnection; pub struct AgentServerDelegate { store: Entity, - project: Entity, - status_tx: Option>, new_version_available: Option>>, } impl AgentServerDelegate { pub fn new( store: Entity, - project: Entity, - status_tx: Option>, new_version_tx: Option>>, ) -> Self { Self { store, - project, - status_tx, new_version_available: new_version_tx, } } - - pub fn project(&self) -> &Entity { - &self.project - } } pub trait AgentServer: Send { diff --git a/crates/agent_servers/src/custom.rs b/crates/agent_servers/src/custom.rs index 0a1830717217872868e66a8222902c49eeaabf9c..d87b9dc4ece042d94da6e6e0ac99e1474c1ce018 100644 --- a/crates/agent_servers/src/custom.rs +++ b/crates/agent_servers/src/custom.rs @@ -364,7 +364,6 @@ impl AgentServer for CustomAgentServer { })?; anyhow::Ok(agent.get_command( extra_env, - delegate.status_tx, delegate.new_version_available, &mut cx.to_async(), )) diff --git a/crates/agent_servers/src/e2e_tests.rs b/crates/agent_servers/src/e2e_tests.rs index a0150d41726c94dc830be70e006f4370de919ead..5dcf416bb87ba4812e1a828c23d49819f2874a99 100644 --- a/crates/agent_servers/src/e2e_tests.rs +++ b/crates/agent_servers/src/e2e_tests.rs @@ -431,7 +431,7 @@ pub async fn new_test_thread( cx: &mut TestAppContext, ) -> Entity { let store = project.read_with(cx, |project, _| project.agent_server_store().clone()); - let delegate = AgentServerDelegate::new(store, project.clone(), None, None); + let delegate = AgentServerDelegate::new(store, None); let connection = cx.update(|cx| server.connect(delegate, cx)).await.unwrap(); diff --git a/crates/agent_ui/src/agent_connection_store.rs b/crates/agent_ui/src/agent_connection_store.rs new file mode 100644 index 0000000000000000000000000000000000000000..c0c4519bcc64d53690dd782a55e6b9da4f498fe0 --- /dev/null +++ b/crates/agent_ui/src/agent_connection_store.rs @@ -0,0 +1,163 @@ +use std::rc::Rc; + +use acp_thread::{AgentConnection, LoadError}; +use agent_servers::{AgentServer, AgentServerDelegate}; +use anyhow::Result; +use collections::HashMap; +use futures::{FutureExt, future::Shared}; +use gpui::{AppContext, Context, Entity, EventEmitter, SharedString, Subscription, Task}; +use project::{AgentServerStore, AgentServersUpdated, Project}; +use watch::Receiver; + +use crate::ExternalAgent; +use project::ExternalAgentServerName; + +pub enum ConnectionEntry { + Connecting { + connect_task: Shared, LoadError>>>, + }, + Connected { + connection: Rc, + }, + Error { + error: LoadError, + }, +} + +impl ConnectionEntry { + pub fn wait_for_connection(&self) -> Shared, LoadError>>> { + match self { + ConnectionEntry::Connecting { connect_task } => connect_task.clone(), + ConnectionEntry::Connected { connection } => { + Task::ready(Ok(connection.clone())).shared() + } + ConnectionEntry::Error { error } => Task::ready(Err(error.clone())).shared(), + } + } +} + +pub enum ConnectionEntryEvent { + NewVersionAvailable(SharedString), +} + +impl EventEmitter for ConnectionEntry {} + +pub struct AgentConnectionStore { + project: Entity, + entries: HashMap>, + _subscriptions: Vec, +} + +impl AgentConnectionStore { + pub fn new(project: Entity, cx: &mut Context) -> Self { + let agent_server_store = project.read(cx).agent_server_store().clone(); + let subscription = cx.subscribe(&agent_server_store, Self::handle_agent_servers_updated); + Self { + project, + entries: HashMap::default(), + _subscriptions: vec![subscription], + } + } + + pub fn request_connection( + &mut self, + key: ExternalAgent, + server: Rc, + cx: &mut Context, + ) -> Entity { + self.entries.get(&key).cloned().unwrap_or_else(|| { + let (mut new_version_rx, connect_task) = self.start_connection(server.clone(), cx); + let connect_task = connect_task.shared(); + + let entry = cx.new(|_cx| ConnectionEntry::Connecting { + connect_task: connect_task.clone(), + }); + + self.entries.insert(key.clone(), entry.clone()); + + cx.spawn({ + let key = key.clone(); + let entry = entry.clone(); + async move |this, cx| match connect_task.await { + Ok(connection) => { + entry.update(cx, |entry, cx| { + if let ConnectionEntry::Connecting { .. } = entry { + *entry = ConnectionEntry::Connected { connection }; + cx.notify(); + } + }); + } + Err(error) => { + entry.update(cx, |entry, cx| { + if let ConnectionEntry::Connecting { .. } = entry { + *entry = ConnectionEntry::Error { error }; + cx.notify(); + } + }); + this.update(cx, |this, _cx| this.entries.remove(&key)).ok(); + } + } + }) + .detach(); + + cx.spawn({ + let entry = entry.clone(); + async move |this, cx| { + while let Ok(version) = new_version_rx.recv().await { + if let Some(version) = version { + entry.update(cx, |_entry, cx| { + cx.emit(ConnectionEntryEvent::NewVersionAvailable( + version.clone().into(), + )); + }); + this.update(cx, |this, _cx| this.entries.remove(&key)).ok(); + } + } + } + }) + .detach(); + + entry + }) + } + + fn handle_agent_servers_updated( + &mut self, + store: Entity, + _: &AgentServersUpdated, + cx: &mut Context, + ) { + let store = store.read(cx); + self.entries.retain(|key, _| match key { + ExternalAgent::NativeAgent => true, + ExternalAgent::Custom { name } => store + .external_agents + .contains_key(&ExternalAgentServerName(name.clone())), + }); + cx.notify(); + } + + fn start_connection( + &self, + server: Rc, + cx: &mut Context, + ) -> ( + Receiver>, + Task, LoadError>>, + ) { + let (new_version_tx, new_version_rx) = watch::channel::>(None); + + let agent_server_store = self.project.read(cx).agent_server_store().clone(); + let delegate = AgentServerDelegate::new(agent_server_store, Some(new_version_tx)); + + let connect_task = server.connect(delegate, cx); + let connect_task = cx.spawn(async move |_this, _cx| match connect_task.await { + Ok(connection) => Ok(connection), + Err(err) => match err.downcast::() { + Ok(load_error) => Err(load_error), + Err(err) => Err(LoadError::Other(SharedString::from(err.to_string()))), + }, + }); + (new_version_rx, connect_task) + } +} diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index 2b9f2f5624072f7b9c9f01f1daecd7e1103c758b..80f8925ad05414b9839ac53953156ef35c43e08f 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -30,6 +30,7 @@ use zed_actions::agent::{ }; use crate::ManageProfiles; +use crate::agent_connection_store::AgentConnectionStore; use crate::ui::{AcpOnboardingModal, ClaudeCodeOnboardingModal}; use crate::{ AddContextServer, AgentDiffPane, ConnectionView, CopyThreadToClipboard, Follow, @@ -790,6 +791,7 @@ pub struct AgentPanel { thread_store: Entity, text_thread_store: Entity, prompt_store: Option>, + connection_store: Entity, context_server_registry: Entity, configuration: Option>, configuration_subscription: Option, @@ -1116,6 +1118,7 @@ impl AgentPanel { language_registry, text_thread_store, prompt_store, + connection_store: cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)), configuration: None, configuration_subscription: None, focus_handle: cx.focus_handle(), @@ -2395,7 +2398,7 @@ impl AgentPanel { window: &mut Window, cx: &mut Context, ) { - let selected_agent = AgentType::from(ext_agent); + let selected_agent = AgentType::from(ext_agent.clone()); if self.selected_agent != selected_agent { self.selected_agent = selected_agent; self.serialize(cx); @@ -2406,9 +2409,13 @@ impl AgentPanel { .is_some() .then(|| self.thread_store.clone()); + let connection_store = self.connection_store.clone(); + let server_view = cx.new(|cx| { crate::ConnectionView::new( server, + connection_store, + ext_agent, resume_session_id, cwd, title, diff --git a/crates/agent_ui/src/agent_ui.rs b/crates/agent_ui/src/agent_ui.rs index 8583e8977a719987b12770eec2d77408187a4e1f..d37dbdbbeb184cac31320b4bc9232354eb3dcc8d 100644 --- a/crates/agent_ui/src/agent_ui.rs +++ b/crates/agent_ui/src/agent_ui.rs @@ -1,4 +1,5 @@ mod agent_configuration; +pub(crate) mod agent_connection_store; mod agent_diff; mod agent_model_selector; mod agent_panel; @@ -212,7 +213,7 @@ pub struct NewNativeAgentThreadFromSummary { } // TODO unify this with AgentType -#[derive(Debug, Clone, PartialEq, Serialize, JsonSchema)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum ExternalAgent { NativeAgent, diff --git a/crates/agent_ui/src/connection_view.rs b/crates/agent_ui/src/connection_view.rs index 07841c42215795ffcccf9f7e5ca684f42a59b498..3b07929813e5583164700905a1fa327f3ac9d964 100644 --- a/crates/agent_ui/src/connection_view.rs +++ b/crates/agent_ui/src/connection_view.rs @@ -8,7 +8,9 @@ use acp_thread::{AgentConnection, Plan}; use action_log::{ActionLog, ActionLogTelemetry}; use agent::{NativeAgentServer, NativeAgentSessionList, SharedThread, ThreadStore}; use agent_client_protocol::{self as acp, PromptCapabilities}; -use agent_servers::{AgentServer, AgentServerDelegate}; +use agent_servers::AgentServer; +#[cfg(test)] +use agent_servers::AgentServerDelegate; use agent_settings::{AgentProfileId, AgentSettings}; use anyhow::{Result, anyhow}; use arrayvec::ArrayVec; @@ -65,6 +67,7 @@ use super::entry_view_state::EntryViewState; use super::thread_history::ThreadHistory; use crate::ModeSelector; use crate::ModelSelectorPopover; +use crate::agent_connection_store::{AgentConnectionStore, ConnectionEntryEvent}; use crate::agent_diff::AgentDiff; use crate::entry_view_state::{EntryViewEvent, ViewEvent}; use crate::message_editor::{MessageEditor, MessageEditorEvent}; @@ -73,10 +76,10 @@ use crate::ui::{AgentNotification, AgentNotificationEvent}; use crate::{ AgentDiffPane, AgentInitialContent, AgentPanel, AllowAlways, AllowOnce, AuthorizeToolCall, ClearMessageQueue, CycleFavoriteModels, CycleModeSelector, CycleThinkingEffort, - EditFirstQueuedMessage, ExpandMessageEditor, Follow, KeepAll, NewThread, OpenAddContextMenu, - OpenAgentDiff, OpenHistory, RejectAll, RejectOnce, RemoveFirstQueuedMessage, SendImmediately, - SendNextQueuedMessage, ToggleFastMode, ToggleProfileSelector, ToggleThinkingEffortMenu, - ToggleThinkingMode, UndoLastReject, + EditFirstQueuedMessage, ExpandMessageEditor, ExternalAgent, Follow, KeepAll, NewThread, + OpenAddContextMenu, OpenAgentDiff, OpenHistory, RejectAll, RejectOnce, + RemoveFirstQueuedMessage, SendImmediately, SendNextQueuedMessage, ToggleFastMode, + ToggleProfileSelector, ToggleThinkingEffortMenu, ToggleThinkingMode, UndoLastReject, }; const STOPWATCH_THRESHOLD: Duration = Duration::from_secs(30); @@ -303,6 +306,8 @@ impl EventEmitter for ConnectionView {} pub struct ConnectionView { agent: Rc, + connection_store: Entity, + connection_key: ExternalAgent, agent_server_store: Entity, workspace: WeakEntity, project: Entity, @@ -414,6 +419,7 @@ pub struct ConnectedServerState { threads: HashMap>, connection: Rc, conversation: Entity, + _connection_entry_subscription: Subscription, } enum AuthState { @@ -434,9 +440,7 @@ impl AuthState { struct LoadingView { session_id: Option, - title: SharedString, _load_task: Task<()>, - _update_title_task: Task>, } impl ConnectedServerState { @@ -470,6 +474,8 @@ impl ConnectedServerState { impl ConnectionView { pub fn new( agent: Rc, + connection_store: Entity, + connection_key: ExternalAgent, resume_session_id: Option, cwd: Option, title: Option, @@ -509,6 +515,8 @@ impl ConnectionView { Self { agent: agent.clone(), + connection_store: connection_store.clone(), + connection_key: connection_key.clone(), agent_server_store, workspace, project: project.clone(), @@ -516,6 +524,8 @@ impl ConnectionView { prompt_store, server_state: Self::initial_state( agent.clone(), + connection_store, + connection_key, resume_session_id, cwd, title, @@ -558,6 +568,8 @@ impl ConnectionView { let state = Self::initial_state( self.agent.clone(), + self.connection_store.clone(), + self.connection_key.clone(), resume_session_id, cwd, title, @@ -584,6 +596,8 @@ impl ConnectionView { fn initial_state( agent: Rc, + connection_store: Entity, + connection_key: ExternalAgent, resume_session_id: Option, cwd: Option, title: Option, @@ -640,29 +654,31 @@ impl ConnectionView { .or_else(|| worktree_roots.first().cloned()) .unwrap_or_else(|| paths::home_dir().as_path().into()); - let (status_tx, mut status_rx) = watch::channel("Loading…".into()); - let (new_version_available_tx, mut new_version_available_rx) = watch::channel(None); - let delegate = AgentServerDelegate::new( - project.read(cx).agent_server_store().clone(), - project.clone(), - Some(status_tx), - Some(new_version_available_tx), - ); + let connection_entry = connection_store.update(cx, |store, cx| { + store.request_connection(connection_key, agent.clone(), cx) + }); + + let connection_entry_subscription = + cx.subscribe(&connection_entry, |this, _entry, event, cx| match event { + ConnectionEntryEvent::NewVersionAvailable(version) => { + if let Some(thread) = this.active_thread() { + thread.update(cx, |thread, cx| { + thread.new_server_version_available = Some(version.clone()); + cx.notify(); + }); + } + } + }); + + let connect_result = connection_entry.read(cx).wait_for_connection(); - let connect_task = agent.connect(delegate, cx); let load_session_id = resume_session_id.clone(); let load_task = cx.spawn_in(window, async move |this, cx| { - let connection = match connect_task.await { + let connection = match connect_result.await { Ok(connection) => connection, Err(err) => { this.update_in(cx, |this, window, cx| { - if err.downcast_ref::().is_some() { - this.handle_load_error(load_session_id.clone(), err, window, cx); - } else if let Some(active) = this.active_thread() { - active.update(cx, |active, cx| active.handle_thread_error(err, cx)); - } else { - this.handle_load_error(load_session_id.clone(), err, window, cx); - } + this.handle_load_error(load_session_id.clone(), err, window, cx); cx.notify(); }) .log_err(); @@ -776,52 +792,27 @@ impl ConnectionView { active_id: Some(id.clone()), threads: HashMap::from_iter([(id, current)]), conversation, + _connection_entry_subscription: connection_entry_subscription, }), cx, ); } Err(err) => { - this.handle_load_error(load_session_id.clone(), err, window, cx); + this.handle_load_error( + load_session_id.clone(), + LoadError::Other(err.to_string().into()), + window, + cx, + ); } }; }) .log_err(); }); - cx.spawn(async move |this, cx| { - while let Ok(new_version) = new_version_available_rx.recv().await { - if let Some(new_version) = new_version { - this.update(cx, |this, cx| { - if let Some(thread) = this.active_thread() { - thread.update(cx, |thread, _cx| { - thread.new_server_version_available = Some(new_version.into()); - }); - } - cx.notify(); - }) - .ok(); - } - } - }) - .detach(); - - let loading_view = cx.new(|cx| { - let update_title_task = cx.spawn(async move |this, cx| { - loop { - let status = status_rx.recv().await?; - this.update(cx, |this: &mut LoadingView, cx| { - this.title = status; - cx.notify(); - })?; - } - }); - - LoadingView { - session_id: resume_session_id, - title: "Loading…".into(), - _load_task: load_task, - _update_title_task: update_title_task, - } + let loading_view = cx.new(|_cx| LoadingView { + session_id: resume_session_id, + _load_task: load_task, }); ServerState::Loading(loading_view) @@ -1099,6 +1090,7 @@ impl ConnectionView { threads: HashMap::default(), connection, conversation: cx.new(|_cx| Conversation::default()), + _connection_entry_subscription: Subscription::new(|| {}), }), cx, ); @@ -1111,7 +1103,7 @@ impl ConnectionView { fn handle_load_error( &mut self, session_id: Option, - err: anyhow::Error, + err: LoadError, window: &mut Window, cx: &mut Context, ) { @@ -1125,15 +1117,10 @@ impl ConnectionView { self.focus_handle.focus(window, cx) } } - let load_error = if let Some(load_err) = err.downcast_ref::() { - load_err.clone() - } else { - LoadError::Other(format!("{:#}", err).into()) - }; - self.emit_load_error_telemetry(&load_error); + self.emit_load_error_telemetry(&err); self.set_server_state( ServerState::LoadError { - error: load_error, + error: err, session_id, }, cx, @@ -1172,10 +1159,10 @@ impl ConnectionView { &self.workspace } - pub fn title(&self, cx: &App) -> SharedString { + pub fn title(&self, _cx: &App) -> SharedString { match &self.server_state { ServerState::Connected(_) => "New Thread".into(), - ServerState::Loading(loading_view) => loading_view.read(cx).title.clone(), + ServerState::Loading(_) => "Loading…".into(), ServerState::LoadError { error, .. } => match error { LoadError::Unsupported { .. } => format!("Upgrade {}", self.agent.name()).into(), LoadError::FailedToInstall(_) => { @@ -2910,11 +2897,17 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); // Create history without an initial session list - it will be set after connection let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(StubAgentServer::default_response()), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, None, None, None, @@ -3010,11 +3003,17 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(StubAgentServer::new(ResumeOnlyAgentConnection)), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, Some(SessionId::new("resume-session")), None, None, @@ -3063,11 +3062,17 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let _thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(StubAgentServer::new(connection)), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, Some(SessionId::new("session-1")), Some(PathBuf::from("/project/subdir")), None, @@ -3114,11 +3119,17 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let _thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(StubAgentServer::new(connection)), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, Some(SessionId::new("session-1")), Some(PathBuf::from("/some/other/path")), None, @@ -3165,11 +3176,17 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let _thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(StubAgentServer::new(connection)), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, Some(SessionId::new("session-1")), Some(PathBuf::from("/project/../outside")), None, @@ -3477,12 +3494,18 @@ pub(crate) mod tests { // Set up thread view in workspace 1 let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project1.clone(), cx))); let agent = StubAgentServer::default_response(); let thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(agent), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, None, None, None, @@ -3691,11 +3714,17 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(agent), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, None, None, None, @@ -4410,12 +4439,18 @@ pub(crate) mod tests { let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx))); + let connection_store = + cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx))); let connection = Rc::new(StubAgentConnection::new()); let thread_view = cx.update(|window, cx| { cx.new(|cx| { ConnectionView::new( Rc::new(StubAgentServer::new(connection.as_ref().clone())), + connection_store, + ExternalAgent::Custom { + name: "Test".into(), + }, None, None, None, diff --git a/crates/agent_ui/src/mention_set.rs b/crates/agent_ui/src/mention_set.rs index dc9d793a5ca5012ca2fe719f1e39bb3fc4fa6d66..e072037f1758e00e648dc46c7ee70599c4363eef 100644 --- a/crates/agent_ui/src/mention_set.rs +++ b/crates/agent_ui/src/mention_set.rs @@ -553,12 +553,8 @@ impl MentionSet { project.read(cx).fs().clone(), thread_store, )); - let delegate = AgentServerDelegate::new( - project.read(cx).agent_server_store().clone(), - project.clone(), - None, - None, - ); + let delegate = + AgentServerDelegate::new(project.read(cx).agent_server_store().clone(), None); let connection = server.connect(delegate, cx); cx.spawn(async move |_, cx| { let agent = connection.await?; diff --git a/crates/project/src/agent_server_store.rs b/crates/project/src/agent_server_store.rs index b1dbefa15a3dcaf64c36d027d68060d18f533def..4a7c2b03a4e03ddfa31bed24254ebe275a17c224 100644 --- a/crates/project/src/agent_server_store.rs +++ b/crates/project/src/agent_server_store.rs @@ -100,7 +100,6 @@ pub trait ExternalAgentServer { fn get_command( &mut self, extra_env: HashMap, - status_tx: Option>, new_version_available_tx: Option>>, cx: &mut AsyncApp, ) -> Task>; @@ -243,7 +242,6 @@ impl AgentServerStore { project_id: *project_id, upstream_client: upstream_client.clone(), name: agent_server_name.clone(), - status_tx: None, new_version_available_tx: None, }) as Box, @@ -347,7 +345,6 @@ impl AgentServerStore { pub fn init_remote(session: &AnyProtoClient) { session.add_entity_message_handler(Self::handle_external_agents_updated); - session.add_entity_message_handler(Self::handle_loading_status_updated); session.add_entity_message_handler(Self::handle_new_version_available); } @@ -695,57 +692,38 @@ impl AgentServerStore { .get_mut(&*envelope.payload.name) .map(|entry| entry.server.as_mut()) .with_context(|| format!("agent `{}` not found", envelope.payload.name))?; - let (status_tx, new_version_available_tx) = downstream_client - .clone() - .map(|(project_id, downstream_client)| { - let (status_tx, mut status_rx) = watch::channel(SharedString::from("")); - let (new_version_available_tx, mut new_version_available_rx) = - watch::channel(None); - cx.spawn({ - let downstream_client = downstream_client.clone(); - let name = envelope.payload.name.clone(); - async move |_, _| { - while let Some(status) = status_rx.recv().await.ok() { - downstream_client.send( - proto::ExternalAgentLoadingStatusUpdated { - project_id, - name: name.clone(), - status: status.to_string(), - }, - )?; + let new_version_available_tx = + downstream_client + .clone() + .map(|(project_id, downstream_client)| { + let (new_version_available_tx, mut new_version_available_rx) = + watch::channel(None); + cx.spawn({ + let name = envelope.payload.name.clone(); + async move |_, _| { + if let Some(version) = + new_version_available_rx.recv().await.ok().flatten() + { + downstream_client.send( + proto::NewExternalAgentVersionAvailable { + project_id, + name: name.clone(), + version, + }, + )?; + } + anyhow::Ok(()) } - anyhow::Ok(()) - } - }) - .detach_and_log_err(cx); - cx.spawn({ - let name = envelope.payload.name.clone(); - async move |_, _| { - if let Some(version) = - new_version_available_rx.recv().await.ok().flatten() - { - downstream_client.send( - proto::NewExternalAgentVersionAvailable { - project_id, - name: name.clone(), - version, - }, - )?; - } - anyhow::Ok(()) - } - }) - .detach_and_log_err(cx); - (status_tx, new_version_available_tx) - }) - .unzip(); + }) + .detach_and_log_err(cx); + new_version_available_tx + }); let mut extra_env = HashMap::default(); if no_browser { extra_env.insert("NO_BROWSER".to_owned(), "1".to_owned()); } anyhow::Ok(agent.get_command( extra_env, - status_tx, new_version_available_tx, &mut cx.to_async(), )) @@ -782,13 +760,11 @@ impl AgentServerStore { }; let mut previous_entries = std::mem::take(&mut this.external_agents); - let mut status_txs = HashMap::default(); let mut new_version_available_txs = HashMap::default(); let mut metadata = HashMap::default(); for (name, mut entry) in previous_entries.drain() { if let Some(agent) = entry.server.downcast_mut::() { - status_txs.insert(name.clone(), agent.status_tx.take()); new_version_available_txs .insert(name.clone(), agent.new_version_available_tx.take()); } @@ -820,7 +796,6 @@ impl AgentServerStore { project_id: *project_id, upstream_client: upstream_client.clone(), name: agent_name.clone(), - status_tx: status_txs.remove(&agent_name).flatten(), new_version_available_tx: new_version_available_txs .remove(&agent_name) .flatten(), @@ -884,22 +859,6 @@ impl AgentServerStore { }) } - async fn handle_loading_status_updated( - this: Entity, - envelope: TypedEnvelope, - mut cx: AsyncApp, - ) -> Result<()> { - this.update(&mut cx, |this, _| { - if let Some(agent) = this.external_agents.get_mut(&*envelope.payload.name) - && let Some(agent) = agent.server.downcast_mut::() - && let Some(status_tx) = &mut agent.status_tx - { - status_tx.send(envelope.payload.status.into()).ok(); - } - }); - Ok(()) - } - async fn handle_new_version_available( this: Entity, envelope: TypedEnvelope, @@ -936,7 +895,6 @@ struct RemoteExternalAgentServer { project_id: u64, upstream_client: Entity, name: ExternalAgentServerName, - status_tx: Option>, new_version_available_tx: Option>>, } @@ -944,14 +902,12 @@ impl ExternalAgentServer for RemoteExternalAgentServer { fn get_command( &mut self, extra_env: HashMap, - status_tx: Option>, new_version_available_tx: Option>>, cx: &mut AsyncApp, ) -> Task> { let project_id = self.project_id; let name = self.name.to_string(); let upstream_client = self.upstream_client.downgrade(); - self.status_tx = status_tx; self.new_version_available_tx = new_version_available_tx; cx.spawn(async move |cx| { let mut response = upstream_client @@ -1005,7 +961,6 @@ impl ExternalAgentServer for LocalExtensionArchiveAgent { fn get_command( &mut self, extra_env: HashMap, - _status_tx: Option>, _new_version_available_tx: Option>>, cx: &mut AsyncApp, ) -> Task> { @@ -1205,7 +1160,6 @@ impl ExternalAgentServer for LocalRegistryArchiveAgent { fn get_command( &mut self, extra_env: HashMap, - _status_tx: Option>, _new_version_available_tx: Option>>, cx: &mut AsyncApp, ) -> Task> { @@ -1386,7 +1340,6 @@ impl ExternalAgentServer for LocalRegistryNpxAgent { fn get_command( &mut self, extra_env: HashMap, - _status_tx: Option>, _new_version_available_tx: Option>>, cx: &mut AsyncApp, ) -> Task> { @@ -1453,7 +1406,6 @@ impl ExternalAgentServer for LocalCustomAgent { fn get_command( &mut self, extra_env: HashMap, - _status_tx: Option>, _new_version_available_tx: Option>>, cx: &mut AsyncApp, ) -> Task> { diff --git a/crates/project/tests/integration/ext_agent_tests.rs b/crates/project/tests/integration/ext_agent_tests.rs index f3c398a619a81ee81146de16f8e58b1093569e8a..40961cd0267db9effc897376de9531d5ceb6f463 100644 --- a/crates/project/tests/integration/ext_agent_tests.rs +++ b/crates/project/tests/integration/ext_agent_tests.rs @@ -10,7 +10,6 @@ impl ExternalAgentServer for NoopExternalAgent { fn get_command( &mut self, _extra_env: HashMap, - _status_tx: Option>, _new_version_available_tx: Option>>, _cx: &mut AsyncApp, ) -> Task> { diff --git a/crates/project/tests/integration/extension_agent_tests.rs b/crates/project/tests/integration/extension_agent_tests.rs index eff41a99cab878336206f232450f3c1b490d1fc8..b45f76fbd6835f0cf94f8622df10c2eee3b3c9d3 100644 --- a/crates/project/tests/integration/extension_agent_tests.rs +++ b/crates/project/tests/integration/extension_agent_tests.rs @@ -26,7 +26,6 @@ impl ExternalAgentServer for NoopExternalAgent { fn get_command( &mut self, _extra_env: HashMap, - _status_tx: Option>, _new_version_available_tx: Option>>, _cx: &mut AsyncApp, ) -> Task> { diff --git a/crates/proto/proto/ai.proto b/crates/proto/proto/ai.proto index 428d971c536f6e830e0c056372d311dc7ed7028f..8db36153b5ef75218f0c007e113f1c2c06ded7eb 100644 --- a/crates/proto/proto/ai.proto +++ b/crates/proto/proto/ai.proto @@ -222,7 +222,7 @@ message ExternalExtensionAgentsUpdated { message ExternalAgentLoadingStatusUpdated { uint64 project_id = 1; string name = 2; - string status = 3; + reserved 3; } message NewExternalAgentVersionAvailable { diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index 7f9953c8a4e746d9586b663330badb38149cfb64..0f1d1e3769c405abce5ebf55818f19e64afadc82 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -2028,7 +2028,6 @@ async fn test_remote_external_agent_server( .get_command( HashMap::from_iter([("OTHER_VAR".into(), "other-val".into())]), None, - None, &mut cx.to_async(), ) }) diff --git a/crates/sidebar/Cargo.toml b/crates/sidebar/Cargo.toml index 36a8d1cf085e544d38d903fe63f514539287dcc5..e6b873704ffda9d241fec002eb0fdff0af979c48 100644 --- a/crates/sidebar/Cargo.toml +++ b/crates/sidebar/Cargo.toml @@ -47,4 +47,5 @@ fs = { workspace = true, features = ["test-support"] } gpui = { workspace = true, features = ["test-support"] } project = { workspace = true, features = ["test-support"] } settings = { workspace = true, features = ["test-support"] } -workspace = { workspace = true, features = ["test-support"] } \ No newline at end of file +workspace = { workspace = true, features = ["test-support"] } +recent_projects = { workspace = true, features = ["test-support"] } diff --git a/crates/sidebar/src/sidebar.rs b/crates/sidebar/src/sidebar.rs index ceb566f4c7b22acea44faa3b7f0bf3879d28b7ec..d5cf352665a8cd59bdd6a6b601248bce4a214e3b 100644 --- a/crates/sidebar/src/sidebar.rs +++ b/crates/sidebar/src/sidebar.rs @@ -2569,15 +2569,15 @@ mod tests { let path_list = PathList::new(&[std::path::PathBuf::from("/my-project")]); // Open thread A and keep it generating. - let connection_a = StubAgentConnection::new(); - open_thread_with_connection(&panel, connection_a.clone(), cx); + let connection = StubAgentConnection::new(); + open_thread_with_connection(&panel, connection.clone(), cx); send_message(&panel, cx); let session_id_a = active_session_id(&panel, cx); save_thread_to_store(&session_id_a, &path_list, cx).await; cx.update(|_, cx| { - connection_a.send_update( + connection.send_update( session_id_a.clone(), acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new("working...".into())), cx, @@ -2586,11 +2586,10 @@ mod tests { cx.run_until_parked(); // Open thread B (idle, default response) — thread A goes to background. - let connection_b = StubAgentConnection::new(); - connection_b.set_next_prompt_updates(vec![acp::SessionUpdate::AgentMessageChunk( + connection.set_next_prompt_updates(vec![acp::SessionUpdate::AgentMessageChunk( acp::ContentChunk::new("Done".into()), )]); - open_thread_with_connection(&panel, connection_b, cx); + open_thread_with_connection(&panel, connection, cx); send_message(&panel, cx); let session_id_b = active_session_id(&panel, cx);