Detailed changes
@@ -9,7 +9,6 @@ use collections::{HashMap, HashSet};
pub use custom::*;
use fs::Fs;
use http_client::read_no_proxy_from_env;
-use project::Project;
use project::agent_server_store::AgentServerStore;
use acp_thread::AgentConnection;
@@ -22,29 +21,19 @@ pub use acp::AcpConnection;
pub struct AgentServerDelegate {
store: Entity<AgentServerStore>,
- project: Entity<Project>,
- status_tx: Option<watch::Sender<SharedString>>,
new_version_available: Option<watch::Sender<Option<String>>>,
}
impl AgentServerDelegate {
pub fn new(
store: Entity<AgentServerStore>,
- project: Entity<Project>,
- status_tx: Option<watch::Sender<SharedString>>,
new_version_tx: Option<watch::Sender<Option<String>>>,
) -> Self {
Self {
store,
- project,
- status_tx,
new_version_available: new_version_tx,
}
}
-
- pub fn project(&self) -> &Entity<Project> {
- &self.project
- }
}
pub trait AgentServer: Send {
@@ -364,7 +364,6 @@ impl AgentServer for CustomAgentServer {
})?;
anyhow::Ok(agent.get_command(
extra_env,
- delegate.status_tx,
delegate.new_version_available,
&mut cx.to_async(),
))
@@ -431,7 +431,7 @@ pub async fn new_test_thread(
cx: &mut TestAppContext,
) -> Entity<AcpThread> {
let store = project.read_with(cx, |project, _| project.agent_server_store().clone());
- let delegate = AgentServerDelegate::new(store, project.clone(), None, None);
+ let delegate = AgentServerDelegate::new(store, None);
let connection = cx.update(|cx| server.connect(delegate, cx)).await.unwrap();
@@ -0,0 +1,163 @@
+use std::rc::Rc;
+
+use acp_thread::{AgentConnection, LoadError};
+use agent_servers::{AgentServer, AgentServerDelegate};
+use anyhow::Result;
+use collections::HashMap;
+use futures::{FutureExt, future::Shared};
+use gpui::{AppContext, Context, Entity, EventEmitter, SharedString, Subscription, Task};
+use project::{AgentServerStore, AgentServersUpdated, Project};
+use watch::Receiver;
+
+use crate::ExternalAgent;
+use project::ExternalAgentServerName;
+
+pub enum ConnectionEntry {
+ Connecting {
+ connect_task: Shared<Task<Result<Rc<dyn AgentConnection>, LoadError>>>,
+ },
+ Connected {
+ connection: Rc<dyn AgentConnection>,
+ },
+ Error {
+ error: LoadError,
+ },
+}
+
+impl ConnectionEntry {
+ pub fn wait_for_connection(&self) -> Shared<Task<Result<Rc<dyn AgentConnection>, LoadError>>> {
+ match self {
+ ConnectionEntry::Connecting { connect_task } => connect_task.clone(),
+ ConnectionEntry::Connected { connection } => {
+ Task::ready(Ok(connection.clone())).shared()
+ }
+ ConnectionEntry::Error { error } => Task::ready(Err(error.clone())).shared(),
+ }
+ }
+}
+
+pub enum ConnectionEntryEvent {
+ NewVersionAvailable(SharedString),
+}
+
+impl EventEmitter<ConnectionEntryEvent> for ConnectionEntry {}
+
+pub struct AgentConnectionStore {
+ project: Entity<Project>,
+ entries: HashMap<ExternalAgent, Entity<ConnectionEntry>>,
+ _subscriptions: Vec<Subscription>,
+}
+
+impl AgentConnectionStore {
+ pub fn new(project: Entity<Project>, cx: &mut Context<Self>) -> Self {
+ let agent_server_store = project.read(cx).agent_server_store().clone();
+ let subscription = cx.subscribe(&agent_server_store, Self::handle_agent_servers_updated);
+ Self {
+ project,
+ entries: HashMap::default(),
+ _subscriptions: vec![subscription],
+ }
+ }
+
+ pub fn request_connection(
+ &mut self,
+ key: ExternalAgent,
+ server: Rc<dyn AgentServer>,
+ cx: &mut Context<Self>,
+ ) -> Entity<ConnectionEntry> {
+ self.entries.get(&key).cloned().unwrap_or_else(|| {
+ let (mut new_version_rx, connect_task) = self.start_connection(server.clone(), cx);
+ let connect_task = connect_task.shared();
+
+ let entry = cx.new(|_cx| ConnectionEntry::Connecting {
+ connect_task: connect_task.clone(),
+ });
+
+ self.entries.insert(key.clone(), entry.clone());
+
+ cx.spawn({
+ let key = key.clone();
+ let entry = entry.clone();
+ async move |this, cx| match connect_task.await {
+ Ok(connection) => {
+ entry.update(cx, |entry, cx| {
+ if let ConnectionEntry::Connecting { .. } = entry {
+ *entry = ConnectionEntry::Connected { connection };
+ cx.notify();
+ }
+ });
+ }
+ Err(error) => {
+ entry.update(cx, |entry, cx| {
+ if let ConnectionEntry::Connecting { .. } = entry {
+ *entry = ConnectionEntry::Error { error };
+ cx.notify();
+ }
+ });
+ this.update(cx, |this, _cx| this.entries.remove(&key)).ok();
+ }
+ }
+ })
+ .detach();
+
+ cx.spawn({
+ let entry = entry.clone();
+ async move |this, cx| {
+ while let Ok(version) = new_version_rx.recv().await {
+ if let Some(version) = version {
+ entry.update(cx, |_entry, cx| {
+ cx.emit(ConnectionEntryEvent::NewVersionAvailable(
+ version.clone().into(),
+ ));
+ });
+ this.update(cx, |this, _cx| this.entries.remove(&key)).ok();
+ }
+ }
+ }
+ })
+ .detach();
+
+ entry
+ })
+ }
+
+ fn handle_agent_servers_updated(
+ &mut self,
+ store: Entity<AgentServerStore>,
+ _: &AgentServersUpdated,
+ cx: &mut Context<Self>,
+ ) {
+ let store = store.read(cx);
+ self.entries.retain(|key, _| match key {
+ ExternalAgent::NativeAgent => true,
+ ExternalAgent::Custom { name } => store
+ .external_agents
+ .contains_key(&ExternalAgentServerName(name.clone())),
+ });
+ cx.notify();
+ }
+
+ fn start_connection(
+ &self,
+ server: Rc<dyn AgentServer>,
+ cx: &mut Context<Self>,
+ ) -> (
+ Receiver<Option<String>>,
+ Task<Result<Rc<dyn AgentConnection>, LoadError>>,
+ ) {
+ let (new_version_tx, new_version_rx) = watch::channel::<Option<String>>(None);
+
+ let agent_server_store = self.project.read(cx).agent_server_store().clone();
+ let delegate = AgentServerDelegate::new(agent_server_store, Some(new_version_tx));
+
+ let connect_task = server.connect(delegate, cx);
+ let connect_task = cx.spawn(async move |_this, _cx| match connect_task.await {
+ Ok(connection) => Ok(connection),
+ Err(err) => match err.downcast::<LoadError>() {
+ Ok(load_error) => Err(load_error),
+ Err(err) => Err(LoadError::Other(SharedString::from(err.to_string()))),
+ },
+ });
+ (new_version_rx, connect_task)
+ }
+}
@@ -30,6 +30,7 @@ use zed_actions::agent::{
};
use crate::ManageProfiles;
+use crate::agent_connection_store::AgentConnectionStore;
use crate::ui::{AcpOnboardingModal, ClaudeCodeOnboardingModal};
use crate::{
AddContextServer, AgentDiffPane, ConnectionView, CopyThreadToClipboard, Follow,
@@ -790,6 +791,7 @@ pub struct AgentPanel {
thread_store: Entity<ThreadStore>,
text_thread_store: Entity<assistant_text_thread::TextThreadStore>,
prompt_store: Option<Entity<PromptStore>>,
+ connection_store: Entity<AgentConnectionStore>,
context_server_registry: Entity<ContextServerRegistry>,
configuration: Option<Entity<AgentConfiguration>>,
configuration_subscription: Option<Subscription>,
@@ -1116,6 +1118,7 @@ impl AgentPanel {
language_registry,
text_thread_store,
prompt_store,
+ connection_store: cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)),
configuration: None,
configuration_subscription: None,
focus_handle: cx.focus_handle(),
@@ -2395,7 +2398,7 @@ impl AgentPanel {
window: &mut Window,
cx: &mut Context<Self>,
) {
- let selected_agent = AgentType::from(ext_agent);
+ let selected_agent = AgentType::from(ext_agent.clone());
if self.selected_agent != selected_agent {
self.selected_agent = selected_agent;
self.serialize(cx);
@@ -2406,9 +2409,13 @@ impl AgentPanel {
.is_some()
.then(|| self.thread_store.clone());
+ let connection_store = self.connection_store.clone();
+
let server_view = cx.new(|cx| {
crate::ConnectionView::new(
server,
+ connection_store,
+ ext_agent,
resume_session_id,
cwd,
title,
@@ -1,4 +1,5 @@
mod agent_configuration;
+pub(crate) mod agent_connection_store;
mod agent_diff;
mod agent_model_selector;
mod agent_panel;
@@ -212,7 +213,7 @@ pub struct NewNativeAgentThreadFromSummary {
}
// TODO unify this with AgentType
-#[derive(Debug, Clone, PartialEq, Serialize, JsonSchema)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ExternalAgent {
NativeAgent,
@@ -8,7 +8,9 @@ use acp_thread::{AgentConnection, Plan};
use action_log::{ActionLog, ActionLogTelemetry};
use agent::{NativeAgentServer, NativeAgentSessionList, SharedThread, ThreadStore};
use agent_client_protocol::{self as acp, PromptCapabilities};
-use agent_servers::{AgentServer, AgentServerDelegate};
+use agent_servers::AgentServer;
+#[cfg(test)]
+use agent_servers::AgentServerDelegate;
use agent_settings::{AgentProfileId, AgentSettings};
use anyhow::{Result, anyhow};
use arrayvec::ArrayVec;
@@ -65,6 +67,7 @@ use super::entry_view_state::EntryViewState;
use super::thread_history::ThreadHistory;
use crate::ModeSelector;
use crate::ModelSelectorPopover;
+use crate::agent_connection_store::{AgentConnectionStore, ConnectionEntryEvent};
use crate::agent_diff::AgentDiff;
use crate::entry_view_state::{EntryViewEvent, ViewEvent};
use crate::message_editor::{MessageEditor, MessageEditorEvent};
@@ -73,10 +76,10 @@ use crate::ui::{AgentNotification, AgentNotificationEvent};
use crate::{
AgentDiffPane, AgentInitialContent, AgentPanel, AllowAlways, AllowOnce, AuthorizeToolCall,
ClearMessageQueue, CycleFavoriteModels, CycleModeSelector, CycleThinkingEffort,
- EditFirstQueuedMessage, ExpandMessageEditor, Follow, KeepAll, NewThread, OpenAddContextMenu,
- OpenAgentDiff, OpenHistory, RejectAll, RejectOnce, RemoveFirstQueuedMessage, SendImmediately,
- SendNextQueuedMessage, ToggleFastMode, ToggleProfileSelector, ToggleThinkingEffortMenu,
- ToggleThinkingMode, UndoLastReject,
+ EditFirstQueuedMessage, ExpandMessageEditor, ExternalAgent, Follow, KeepAll, NewThread,
+ OpenAddContextMenu, OpenAgentDiff, OpenHistory, RejectAll, RejectOnce,
+ RemoveFirstQueuedMessage, SendImmediately, SendNextQueuedMessage, ToggleFastMode,
+ ToggleProfileSelector, ToggleThinkingEffortMenu, ToggleThinkingMode, UndoLastReject,
};
const STOPWATCH_THRESHOLD: Duration = Duration::from_secs(30);
@@ -303,6 +306,8 @@ impl EventEmitter<AcpServerViewEvent> for ConnectionView {}
pub struct ConnectionView {
agent: Rc<dyn AgentServer>,
+ connection_store: Entity<AgentConnectionStore>,
+ connection_key: ExternalAgent,
agent_server_store: Entity<AgentServerStore>,
workspace: WeakEntity<Workspace>,
project: Entity<Project>,
@@ -414,6 +419,7 @@ pub struct ConnectedServerState {
threads: HashMap<acp::SessionId, Entity<ThreadView>>,
connection: Rc<dyn AgentConnection>,
conversation: Entity<Conversation>,
+ _connection_entry_subscription: Subscription,
}
enum AuthState {
@@ -434,9 +440,7 @@ impl AuthState {
struct LoadingView {
session_id: Option<acp::SessionId>,
- title: SharedString,
_load_task: Task<()>,
- _update_title_task: Task<anyhow::Result<()>>,
}
impl ConnectedServerState {
@@ -470,6 +474,8 @@ impl ConnectedServerState {
impl ConnectionView {
pub fn new(
agent: Rc<dyn AgentServer>,
+ connection_store: Entity<AgentConnectionStore>,
+ connection_key: ExternalAgent,
resume_session_id: Option<acp::SessionId>,
cwd: Option<PathBuf>,
title: Option<SharedString>,
@@ -509,6 +515,8 @@ impl ConnectionView {
Self {
agent: agent.clone(),
+ connection_store: connection_store.clone(),
+ connection_key: connection_key.clone(),
agent_server_store,
workspace,
project: project.clone(),
@@ -516,6 +524,8 @@ impl ConnectionView {
prompt_store,
server_state: Self::initial_state(
agent.clone(),
+ connection_store,
+ connection_key,
resume_session_id,
cwd,
title,
@@ -558,6 +568,8 @@ impl ConnectionView {
let state = Self::initial_state(
self.agent.clone(),
+ self.connection_store.clone(),
+ self.connection_key.clone(),
resume_session_id,
cwd,
title,
@@ -584,6 +596,8 @@ impl ConnectionView {
fn initial_state(
agent: Rc<dyn AgentServer>,
+ connection_store: Entity<AgentConnectionStore>,
+ connection_key: ExternalAgent,
resume_session_id: Option<acp::SessionId>,
cwd: Option<PathBuf>,
title: Option<SharedString>,
@@ -640,29 +654,31 @@ impl ConnectionView {
.or_else(|| worktree_roots.first().cloned())
.unwrap_or_else(|| paths::home_dir().as_path().into());
- let (status_tx, mut status_rx) = watch::channel("Loadingβ¦".into());
- let (new_version_available_tx, mut new_version_available_rx) = watch::channel(None);
- let delegate = AgentServerDelegate::new(
- project.read(cx).agent_server_store().clone(),
- project.clone(),
- Some(status_tx),
- Some(new_version_available_tx),
- );
+ let connection_entry = connection_store.update(cx, |store, cx| {
+ store.request_connection(connection_key, agent.clone(), cx)
+ });
+
+ let connection_entry_subscription =
+ cx.subscribe(&connection_entry, |this, _entry, event, cx| match event {
+ ConnectionEntryEvent::NewVersionAvailable(version) => {
+ if let Some(thread) = this.active_thread() {
+ thread.update(cx, |thread, cx| {
+ thread.new_server_version_available = Some(version.clone());
+ cx.notify();
+ });
+ }
+ }
+ });
+
+ let connect_result = connection_entry.read(cx).wait_for_connection();
- let connect_task = agent.connect(delegate, cx);
let load_session_id = resume_session_id.clone();
let load_task = cx.spawn_in(window, async move |this, cx| {
- let connection = match connect_task.await {
+ let connection = match connect_result.await {
Ok(connection) => connection,
Err(err) => {
this.update_in(cx, |this, window, cx| {
- if err.downcast_ref::<LoadError>().is_some() {
- this.handle_load_error(load_session_id.clone(), err, window, cx);
- } else if let Some(active) = this.active_thread() {
- active.update(cx, |active, cx| active.handle_thread_error(err, cx));
- } else {
- this.handle_load_error(load_session_id.clone(), err, window, cx);
- }
+ this.handle_load_error(load_session_id.clone(), err, window, cx);
cx.notify();
})
.log_err();
@@ -776,52 +792,27 @@ impl ConnectionView {
active_id: Some(id.clone()),
threads: HashMap::from_iter([(id, current)]),
conversation,
+ _connection_entry_subscription: connection_entry_subscription,
}),
cx,
);
}
Err(err) => {
- this.handle_load_error(load_session_id.clone(), err, window, cx);
+ this.handle_load_error(
+ load_session_id.clone(),
+ LoadError::Other(err.to_string().into()),
+ window,
+ cx,
+ );
}
};
})
.log_err();
});
- cx.spawn(async move |this, cx| {
- while let Ok(new_version) = new_version_available_rx.recv().await {
- if let Some(new_version) = new_version {
- this.update(cx, |this, cx| {
- if let Some(thread) = this.active_thread() {
- thread.update(cx, |thread, _cx| {
- thread.new_server_version_available = Some(new_version.into());
- });
- }
- cx.notify();
- })
- .ok();
- }
- }
- })
- .detach();
-
- let loading_view = cx.new(|cx| {
- let update_title_task = cx.spawn(async move |this, cx| {
- loop {
- let status = status_rx.recv().await?;
- this.update(cx, |this: &mut LoadingView, cx| {
- this.title = status;
- cx.notify();
- })?;
- }
- });
-
- LoadingView {
- session_id: resume_session_id,
- title: "Loadingβ¦".into(),
- _load_task: load_task,
- _update_title_task: update_title_task,
- }
+ let loading_view = cx.new(|_cx| LoadingView {
+ session_id: resume_session_id,
+ _load_task: load_task,
});
ServerState::Loading(loading_view)
@@ -1099,6 +1090,7 @@ impl ConnectionView {
threads: HashMap::default(),
connection,
conversation: cx.new(|_cx| Conversation::default()),
+ _connection_entry_subscription: Subscription::new(|| {}),
}),
cx,
);
@@ -1111,7 +1103,7 @@ impl ConnectionView {
fn handle_load_error(
&mut self,
session_id: Option<acp::SessionId>,
- err: anyhow::Error,
+ err: LoadError,
window: &mut Window,
cx: &mut Context<Self>,
) {
@@ -1125,15 +1117,10 @@ impl ConnectionView {
self.focus_handle.focus(window, cx)
}
}
- let load_error = if let Some(load_err) = err.downcast_ref::<LoadError>() {
- load_err.clone()
- } else {
- LoadError::Other(format!("{:#}", err).into())
- };
- self.emit_load_error_telemetry(&load_error);
+ self.emit_load_error_telemetry(&err);
self.set_server_state(
ServerState::LoadError {
- error: load_error,
+ error: err,
session_id,
},
cx,
@@ -1172,10 +1159,10 @@ impl ConnectionView {
&self.workspace
}
- pub fn title(&self, cx: &App) -> SharedString {
+ pub fn title(&self, _cx: &App) -> SharedString {
match &self.server_state {
ServerState::Connected(_) => "New Thread".into(),
- ServerState::Loading(loading_view) => loading_view.read(cx).title.clone(),
+ ServerState::Loading(_) => "Loadingβ¦".into(),
ServerState::LoadError { error, .. } => match error {
LoadError::Unsupported { .. } => format!("Upgrade {}", self.agent.name()).into(),
LoadError::FailedToInstall(_) => {
@@ -2910,11 +2897,17 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
// Create history without an initial session list - it will be set after connection
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(StubAgentServer::default_response()),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
None,
None,
None,
@@ -3010,11 +3003,17 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(StubAgentServer::new(ResumeOnlyAgentConnection)),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
Some(SessionId::new("resume-session")),
None,
None,
@@ -3063,11 +3062,17 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let _thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(StubAgentServer::new(connection)),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
Some(SessionId::new("session-1")),
Some(PathBuf::from("/project/subdir")),
None,
@@ -3114,11 +3119,17 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let _thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(StubAgentServer::new(connection)),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
Some(SessionId::new("session-1")),
Some(PathBuf::from("/some/other/path")),
None,
@@ -3165,11 +3176,17 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let _thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(StubAgentServer::new(connection)),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
Some(SessionId::new("session-1")),
Some(PathBuf::from("/project/../outside")),
None,
@@ -3477,12 +3494,18 @@ pub(crate) mod tests {
// Set up thread view in workspace 1
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project1.clone(), cx)));
let agent = StubAgentServer::default_response();
let thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(agent),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
None,
None,
None,
@@ -3691,11 +3714,17 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(agent),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
None,
None,
None,
@@ -4410,12 +4439,18 @@ pub(crate) mod tests {
let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
let history = cx.update(|window, cx| cx.new(|cx| ThreadHistory::new(None, window, cx)));
+ let connection_store =
+ cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
let connection = Rc::new(StubAgentConnection::new());
let thread_view = cx.update(|window, cx| {
cx.new(|cx| {
ConnectionView::new(
Rc::new(StubAgentServer::new(connection.as_ref().clone())),
+ connection_store,
+ ExternalAgent::Custom {
+ name: "Test".into(),
+ },
None,
None,
None,
@@ -553,12 +553,8 @@ impl MentionSet {
project.read(cx).fs().clone(),
thread_store,
));
- let delegate = AgentServerDelegate::new(
- project.read(cx).agent_server_store().clone(),
- project.clone(),
- None,
- None,
- );
+ let delegate =
+ AgentServerDelegate::new(project.read(cx).agent_server_store().clone(), None);
let connection = server.connect(delegate, cx);
cx.spawn(async move |_, cx| {
let agent = connection.await?;
@@ -100,7 +100,6 @@ pub trait ExternalAgentServer {
fn get_command(
&mut self,
extra_env: HashMap<String, String>,
- status_tx: Option<watch::Sender<SharedString>>,
new_version_available_tx: Option<watch::Sender<Option<String>>>,
cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>>;
@@ -243,7 +242,6 @@ impl AgentServerStore {
project_id: *project_id,
upstream_client: upstream_client.clone(),
name: agent_server_name.clone(),
- status_tx: None,
new_version_available_tx: None,
})
as Box<dyn ExternalAgentServer>,
@@ -347,7 +345,6 @@ impl AgentServerStore {
pub fn init_remote(session: &AnyProtoClient) {
session.add_entity_message_handler(Self::handle_external_agents_updated);
- session.add_entity_message_handler(Self::handle_loading_status_updated);
session.add_entity_message_handler(Self::handle_new_version_available);
}
@@ -695,57 +692,38 @@ impl AgentServerStore {
.get_mut(&*envelope.payload.name)
.map(|entry| entry.server.as_mut())
.with_context(|| format!("agent `{}` not found", envelope.payload.name))?;
- let (status_tx, new_version_available_tx) = downstream_client
- .clone()
- .map(|(project_id, downstream_client)| {
- let (status_tx, mut status_rx) = watch::channel(SharedString::from(""));
- let (new_version_available_tx, mut new_version_available_rx) =
- watch::channel(None);
- cx.spawn({
- let downstream_client = downstream_client.clone();
- let name = envelope.payload.name.clone();
- async move |_, _| {
- while let Some(status) = status_rx.recv().await.ok() {
- downstream_client.send(
- proto::ExternalAgentLoadingStatusUpdated {
- project_id,
- name: name.clone(),
- status: status.to_string(),
- },
- )?;
+ let new_version_available_tx =
+ downstream_client
+ .clone()
+ .map(|(project_id, downstream_client)| {
+ let (new_version_available_tx, mut new_version_available_rx) =
+ watch::channel(None);
+ cx.spawn({
+ let name = envelope.payload.name.clone();
+ async move |_, _| {
+ if let Some(version) =
+ new_version_available_rx.recv().await.ok().flatten()
+ {
+ downstream_client.send(
+ proto::NewExternalAgentVersionAvailable {
+ project_id,
+ name: name.clone(),
+ version,
+ },
+ )?;
+ }
+ anyhow::Ok(())
}
- anyhow::Ok(())
- }
- })
- .detach_and_log_err(cx);
- cx.spawn({
- let name = envelope.payload.name.clone();
- async move |_, _| {
- if let Some(version) =
- new_version_available_rx.recv().await.ok().flatten()
- {
- downstream_client.send(
- proto::NewExternalAgentVersionAvailable {
- project_id,
- name: name.clone(),
- version,
- },
- )?;
- }
- anyhow::Ok(())
- }
- })
- .detach_and_log_err(cx);
- (status_tx, new_version_available_tx)
- })
- .unzip();
+ })
+ .detach_and_log_err(cx);
+ new_version_available_tx
+ });
let mut extra_env = HashMap::default();
if no_browser {
extra_env.insert("NO_BROWSER".to_owned(), "1".to_owned());
}
anyhow::Ok(agent.get_command(
extra_env,
- status_tx,
new_version_available_tx,
&mut cx.to_async(),
))
@@ -782,13 +760,11 @@ impl AgentServerStore {
};
let mut previous_entries = std::mem::take(&mut this.external_agents);
- let mut status_txs = HashMap::default();
let mut new_version_available_txs = HashMap::default();
let mut metadata = HashMap::default();
for (name, mut entry) in previous_entries.drain() {
if let Some(agent) = entry.server.downcast_mut::<RemoteExternalAgentServer>() {
- status_txs.insert(name.clone(), agent.status_tx.take());
new_version_available_txs
.insert(name.clone(), agent.new_version_available_tx.take());
}
@@ -820,7 +796,6 @@ impl AgentServerStore {
project_id: *project_id,
upstream_client: upstream_client.clone(),
name: agent_name.clone(),
- status_tx: status_txs.remove(&agent_name).flatten(),
new_version_available_tx: new_version_available_txs
.remove(&agent_name)
.flatten(),
@@ -884,22 +859,6 @@ impl AgentServerStore {
})
}
- async fn handle_loading_status_updated(
- this: Entity<Self>,
- envelope: TypedEnvelope<proto::ExternalAgentLoadingStatusUpdated>,
- mut cx: AsyncApp,
- ) -> Result<()> {
- this.update(&mut cx, |this, _| {
- if let Some(agent) = this.external_agents.get_mut(&*envelope.payload.name)
- && let Some(agent) = agent.server.downcast_mut::<RemoteExternalAgentServer>()
- && let Some(status_tx) = &mut agent.status_tx
- {
- status_tx.send(envelope.payload.status.into()).ok();
- }
- });
- Ok(())
- }
-
async fn handle_new_version_available(
this: Entity<Self>,
envelope: TypedEnvelope<proto::NewExternalAgentVersionAvailable>,
@@ -936,7 +895,6 @@ struct RemoteExternalAgentServer {
project_id: u64,
upstream_client: Entity<RemoteClient>,
name: ExternalAgentServerName,
- status_tx: Option<watch::Sender<SharedString>>,
new_version_available_tx: Option<watch::Sender<Option<String>>>,
}
@@ -944,14 +902,12 @@ impl ExternalAgentServer for RemoteExternalAgentServer {
fn get_command(
&mut self,
extra_env: HashMap<String, String>,
- status_tx: Option<watch::Sender<SharedString>>,
new_version_available_tx: Option<watch::Sender<Option<String>>>,
cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
let project_id = self.project_id;
let name = self.name.to_string();
let upstream_client = self.upstream_client.downgrade();
- self.status_tx = status_tx;
self.new_version_available_tx = new_version_available_tx;
cx.spawn(async move |cx| {
let mut response = upstream_client
@@ -1005,7 +961,6 @@ impl ExternalAgentServer for LocalExtensionArchiveAgent {
fn get_command(
&mut self,
extra_env: HashMap<String, String>,
- _status_tx: Option<watch::Sender<SharedString>>,
_new_version_available_tx: Option<watch::Sender<Option<String>>>,
cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
@@ -1205,7 +1160,6 @@ impl ExternalAgentServer for LocalRegistryArchiveAgent {
fn get_command(
&mut self,
extra_env: HashMap<String, String>,
- _status_tx: Option<watch::Sender<SharedString>>,
_new_version_available_tx: Option<watch::Sender<Option<String>>>,
cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
@@ -1386,7 +1340,6 @@ impl ExternalAgentServer for LocalRegistryNpxAgent {
fn get_command(
&mut self,
extra_env: HashMap<String, String>,
- _status_tx: Option<watch::Sender<SharedString>>,
_new_version_available_tx: Option<watch::Sender<Option<String>>>,
cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
@@ -1453,7 +1406,6 @@ impl ExternalAgentServer for LocalCustomAgent {
fn get_command(
&mut self,
extra_env: HashMap<String, String>,
- _status_tx: Option<watch::Sender<SharedString>>,
_new_version_available_tx: Option<watch::Sender<Option<String>>>,
cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
@@ -10,7 +10,6 @@ impl ExternalAgentServer for NoopExternalAgent {
fn get_command(
&mut self,
_extra_env: HashMap<String, String>,
- _status_tx: Option<watch::Sender<SharedString>>,
_new_version_available_tx: Option<watch::Sender<Option<String>>>,
_cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
@@ -26,7 +26,6 @@ impl ExternalAgentServer for NoopExternalAgent {
fn get_command(
&mut self,
_extra_env: HashMap<String, String>,
- _status_tx: Option<watch::Sender<SharedString>>,
_new_version_available_tx: Option<watch::Sender<Option<String>>>,
_cx: &mut AsyncApp,
) -> Task<Result<AgentServerCommand>> {
@@ -222,7 +222,7 @@ message ExternalExtensionAgentsUpdated {
message ExternalAgentLoadingStatusUpdated {
uint64 project_id = 1;
string name = 2;
- string status = 3;
+ reserved 3;
}
message NewExternalAgentVersionAvailable {
@@ -2028,7 +2028,6 @@ async fn test_remote_external_agent_server(
.get_command(
HashMap::from_iter([("OTHER_VAR".into(), "other-val".into())]),
None,
- None,
&mut cx.to_async(),
)
})
@@ -47,4 +47,5 @@ fs = { workspace = true, features = ["test-support"] }
gpui = { workspace = true, features = ["test-support"] }
project = { workspace = true, features = ["test-support"] }
settings = { workspace = true, features = ["test-support"] }
-workspace = { workspace = true, features = ["test-support"] }
+workspace = { workspace = true, features = ["test-support"] }
+recent_projects = { workspace = true, features = ["test-support"] }
@@ -2569,15 +2569,15 @@ mod tests {
let path_list = PathList::new(&[std::path::PathBuf::from("/my-project")]);
// Open thread A and keep it generating.
- let connection_a = StubAgentConnection::new();
- open_thread_with_connection(&panel, connection_a.clone(), cx);
+ let connection = StubAgentConnection::new();
+ open_thread_with_connection(&panel, connection.clone(), cx);
send_message(&panel, cx);
let session_id_a = active_session_id(&panel, cx);
save_thread_to_store(&session_id_a, &path_list, cx).await;
cx.update(|_, cx| {
- connection_a.send_update(
+ connection.send_update(
session_id_a.clone(),
acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new("working...".into())),
cx,
@@ -2586,11 +2586,10 @@ mod tests {
cx.run_until_parked();
// Open thread B (idle, default response) β thread A goes to background.
- let connection_b = StubAgentConnection::new();
- connection_b.set_next_prompt_updates(vec![acp::SessionUpdate::AgentMessageChunk(
+ connection.set_next_prompt_updates(vec![acp::SessionUpdate::AgentMessageChunk(
acp::ContentChunk::new("Done".into()),
)]);
- open_thread_with_connection(&panel, connection_b, cx);
+ open_thread_with_connection(&panel, connection, cx);
send_message(&panel, cx);
let session_id_b = active_session_id(&panel, cx);