@@ -1,6 +1,7 @@
pub mod extension;
pub mod registry;
+use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -10,6 +11,8 @@ use context_server::{ContextServer, ContextServerCommand, ContextServerId};
use futures::{FutureExt as _, future::join_all};
use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
use registry::ContextServerDescriptorRegistry;
+use remote::RemoteClient;
+use rpc::{AnyProtoClient, TypedEnvelope, proto};
use settings::{Settings as _, SettingsStore};
use util::{ResultExt as _, rel_path::RelPath};
@@ -99,10 +102,12 @@ impl ContextServerState {
pub enum ContextServerConfiguration {
Custom {
command: ContextServerCommand,
+ remote: bool,
},
Extension {
command: ContextServerCommand,
settings: serde_json::Value,
+ remote: bool,
},
Http {
url: url::Url,
@@ -114,12 +119,20 @@ pub enum ContextServerConfiguration {
impl ContextServerConfiguration {
pub fn command(&self) -> Option<&ContextServerCommand> {
match self {
- ContextServerConfiguration::Custom { command } => Some(command),
+ ContextServerConfiguration::Custom { command, .. } => Some(command),
ContextServerConfiguration::Extension { command, .. } => Some(command),
ContextServerConfiguration::Http { .. } => None,
}
}
+ pub fn remote(&self) -> bool {
+ match self {
+ ContextServerConfiguration::Custom { remote, .. } => *remote,
+ ContextServerConfiguration::Extension { remote, .. } => *remote,
+ ContextServerConfiguration::Http { .. } => false,
+ }
+ }
+
pub async fn from_settings(
settings: ContextServerSettings,
id: ContextServerId,
@@ -131,18 +144,22 @@ impl ContextServerConfiguration {
ContextServerSettings::Stdio {
enabled: _,
command,
- } => Some(ContextServerConfiguration::Custom { command }),
+ remote,
+ } => Some(ContextServerConfiguration::Custom { command, remote }),
ContextServerSettings::Extension {
enabled: _,
settings,
+ remote,
} => {
let descriptor =
cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
match descriptor.command(worktree_store, cx).await {
- Ok(command) => {
- Some(ContextServerConfiguration::Extension { command, settings })
- }
+ Ok(command) => Some(ContextServerConfiguration::Extension {
+ command,
+ settings,
+ remote,
+ }),
Err(e) => {
log::error!(
"Failed to create context server configuration from settings: {e:#}"
@@ -171,11 +188,23 @@ impl ContextServerConfiguration {
pub type ContextServerFactory =
Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
+enum ContextServerStoreState {
+ Local {
+ downstream_client: Option<(u64, AnyProtoClient)>,
+ is_headless: bool,
+ },
+ Remote {
+ project_id: u64,
+ upstream_client: Entity<RemoteClient>,
+ },
+}
+
pub struct ContextServerStore {
+ state: ContextServerStoreState,
context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
servers: HashMap<ContextServerId, ContextServerState>,
worktree_store: Entity<WorktreeStore>,
- project: WeakEntity<Project>,
+ project: Option<WeakEntity<Project>>,
registry: Entity<ContextServerDescriptorRegistry>,
update_servers_task: Option<Task<Result<()>>>,
context_server_factory: Option<ContextServerFactory>,
@@ -193,9 +222,31 @@ pub enum Event {
impl EventEmitter<Event> for ContextServerStore {}
impl ContextServerStore {
- pub fn new(
+ pub fn local(
+ worktree_store: Entity<WorktreeStore>,
+ weak_project: Option<WeakEntity<Project>>,
+ headless: bool,
+ cx: &mut Context<Self>,
+ ) -> Self {
+ Self::new_internal(
+ !headless,
+ None,
+ ContextServerDescriptorRegistry::default_global(cx),
+ worktree_store,
+ weak_project,
+ ContextServerStoreState::Local {
+ downstream_client: None,
+ is_headless: headless,
+ },
+ cx,
+ )
+ }
+
+ pub fn remote(
+ project_id: u64,
+ upstream_client: Entity<RemoteClient>,
worktree_store: Entity<WorktreeStore>,
- weak_project: WeakEntity<Project>,
+ weak_project: Option<WeakEntity<Project>>,
cx: &mut Context<Self>,
) -> Self {
Self::new_internal(
@@ -204,10 +255,31 @@ impl ContextServerStore {
ContextServerDescriptorRegistry::default_global(cx),
worktree_store,
weak_project,
+ ContextServerStoreState::Remote {
+ project_id,
+ upstream_client,
+ },
cx,
)
}
+ pub fn init_headless(session: &AnyProtoClient) {
+ session.add_entity_request_handler(Self::handle_get_context_server_command);
+ }
+
+ pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
+ if let ContextServerStoreState::Local {
+ downstream_client, ..
+ } = &mut self.state
+ {
+ *downstream_client = Some((project_id, client));
+ }
+ }
+
+ pub fn is_remote_project(&self) -> bool {
+ matches!(self.state, ContextServerStoreState::Remote { .. })
+ }
+
/// Returns all configured context server ids, excluding the ones that are disabled
pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
self.context_server_settings
@@ -221,10 +293,21 @@ impl ContextServerStore {
pub fn test(
registry: Entity<ContextServerDescriptorRegistry>,
worktree_store: Entity<WorktreeStore>,
- weak_project: WeakEntity<Project>,
+ weak_project: Option<WeakEntity<Project>>,
cx: &mut Context<Self>,
) -> Self {
- Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
+ Self::new_internal(
+ false,
+ None,
+ registry,
+ worktree_store,
+ weak_project,
+ ContextServerStoreState::Local {
+ downstream_client: None,
+ is_headless: false,
+ },
+ cx,
+ )
}
#[cfg(any(test, feature = "test-support"))]
@@ -232,7 +315,7 @@ impl ContextServerStore {
context_server_factory: Option<ContextServerFactory>,
registry: Entity<ContextServerDescriptorRegistry>,
worktree_store: Entity<WorktreeStore>,
- weak_project: WeakEntity<Project>,
+ weak_project: Option<WeakEntity<Project>>,
cx: &mut Context<Self>,
) -> Self {
Self::new_internal(
@@ -241,6 +324,10 @@ impl ContextServerStore {
registry,
worktree_store,
weak_project,
+ ContextServerStoreState::Local {
+ downstream_client: None,
+ is_headless: false,
+ },
cx,
)
}
@@ -264,6 +351,7 @@ impl ContextServerStore {
env: None,
timeout: None,
},
+ remote: false,
});
self.run_server(server, configuration, cx);
}
@@ -273,29 +361,30 @@ impl ContextServerStore {
context_server_factory: Option<ContextServerFactory>,
registry: Entity<ContextServerDescriptorRegistry>,
worktree_store: Entity<WorktreeStore>,
- weak_project: WeakEntity<Project>,
+ weak_project: Option<WeakEntity<Project>>,
+ state: ContextServerStoreState,
cx: &mut Context<Self>,
) -> Self {
- let subscriptions = if maintain_server_loop {
- vec![
- cx.observe(®istry, |this, _registry, cx| {
- this.available_context_servers_changed(cx);
- }),
- cx.observe_global::<SettingsStore>(|this, cx| {
- let settings =
- &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
- if &this.context_server_settings == settings {
- return;
- }
- this.context_server_settings = settings.clone();
- this.available_context_servers_changed(cx);
- }),
- ]
- } else {
- Vec::new()
- };
+ let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
+ let settings =
+ &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
+ if &this.context_server_settings == settings {
+ return;
+ }
+ this.context_server_settings = settings.clone();
+ if maintain_server_loop {
+ this.available_context_servers_changed(cx);
+ }
+ })];
+
+ if maintain_server_loop {
+ subscriptions.push(cx.observe(®istry, |this, _registry, cx| {
+ this.available_context_servers_changed(cx);
+ }));
+ }
let mut this = Self {
+ state,
_subscriptions: subscriptions,
context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
.context_servers
@@ -509,66 +598,191 @@ impl ContextServerStore {
Ok(())
}
- fn create_context_server(
- &self,
+ async fn create_context_server(
+ this: WeakEntity<Self>,
id: ContextServerId,
configuration: Arc<ContextServerConfiguration>,
- cx: &mut Context<Self>,
- ) -> Result<Arc<ContextServer>> {
- let global_timeout =
- Self::resolve_project_settings(&self.worktree_store, cx).context_server_timeout;
+ cx: &mut AsyncApp,
+ ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
+ let remote = configuration.remote();
+ let needs_remote_command = match configuration.as_ref() {
+ ContextServerConfiguration::Custom { .. }
+ | ContextServerConfiguration::Extension { .. } => remote,
+ ContextServerConfiguration::Http { .. } => false,
+ };
- if let Some(factory) = self.context_server_factory.as_ref() {
- return Ok(factory(id, configuration));
- }
+ let (remote_state, is_remote_project) = this.update(cx, |this, _| {
+ let remote_state = match &this.state {
+ ContextServerStoreState::Remote {
+ project_id,
+ upstream_client,
+ } if needs_remote_command => Some((*project_id, upstream_client.clone())),
+ _ => None,
+ };
+ (remote_state, this.is_remote_project())
+ })?;
- match configuration.as_ref() {
- ContextServerConfiguration::Http {
- url,
- headers,
- timeout,
- } => Ok(Arc::new(ContextServer::http(
- id,
- url,
- headers.clone(),
- cx.http_client(),
- cx.background_executor().clone(),
- Some(Duration::from_secs(
- timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
- )),
- )?)),
- _ => {
- let root_path = self
- .project
- .read_with(cx, |project, cx| project.active_project_directory(cx))
- .ok()
- .flatten()
- .or_else(|| {
- self.worktree_store.read_with(cx, |store, cx| {
- store.visible_worktrees(cx).fold(None, |acc, item| {
- if acc.is_none() {
- item.read(cx).root_dir()
- } else {
- acc
- }
- })
+ let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
+ this.project
+ .as_ref()
+ .and_then(|project| {
+ project
+ .read_with(cx, |project, cx| project.active_project_directory(cx))
+ .ok()
+ .flatten()
+ })
+ .or_else(|| {
+ this.worktree_store.read_with(cx, |store, cx| {
+ store.visible_worktrees(cx).fold(None, |acc, item| {
+ if acc.is_none() {
+ item.read(cx).root_dir()
+ } else {
+ acc
+ }
})
- });
-
- let mut command = configuration
- .command()
- .context("Missing command configuration for stdio context server")?
- .clone();
- command.timeout = Some(
- command
- .timeout
- .unwrap_or(global_timeout)
- .min(MAX_TIMEOUT_SECS),
- );
+ })
+ })
+ })?;
- Ok(Arc::new(ContextServer::stdio(id, command, root_path)))
+ let configuration = if let Some((project_id, upstream_client)) = remote_state {
+ let root_dir = root_path.as_ref().map(|p| p.display().to_string());
+
+ let response = upstream_client
+ .update(cx, |client, _| {
+ client
+ .proto_client()
+ .request(proto::GetContextServerCommand {
+ project_id,
+ server_id: id.0.to_string(),
+ root_dir: root_dir.clone(),
+ })
+ })
+ .await?;
+
+ let remote_command = upstream_client.update(cx, |client, _| {
+ client.build_command(
+ Some(response.path),
+ &response.args,
+ &response.env.into_iter().collect(),
+ root_dir,
+ None,
+ )
+ })?;
+
+ let command = ContextServerCommand {
+ path: remote_command.program.into(),
+ args: remote_command.args,
+ env: Some(remote_command.env.into_iter().collect()),
+ timeout: None,
+ };
+
+ Arc::new(ContextServerConfiguration::Custom { command, remote })
+ } else {
+ configuration
+ };
+
+ let server: Arc<ContextServer> = this.update(cx, |this, cx| {
+ let global_timeout =
+ Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
+
+ if let Some(factory) = this.context_server_factory.as_ref() {
+ return anyhow::Ok(factory(id.clone(), configuration.clone()));
}
- }
+
+ match configuration.as_ref() {
+ ContextServerConfiguration::Http {
+ url,
+ headers,
+ timeout,
+ } => anyhow::Ok(Arc::new(ContextServer::http(
+ id,
+ url,
+ headers.clone(),
+ cx.http_client(),
+ cx.background_executor().clone(),
+ Some(Duration::from_secs(
+ timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
+ )),
+ )?)),
+ _ => {
+ let mut command = configuration
+ .command()
+ .context("Missing command configuration for stdio context server")?
+ .clone();
+ command.timeout = Some(
+ command
+ .timeout
+ .unwrap_or(global_timeout)
+ .min(MAX_TIMEOUT_SECS),
+ );
+
+ // Don't pass remote paths as working directory for locally-spawned processes
+ let working_directory = if is_remote_project { None } else { root_path };
+ anyhow::Ok(Arc::new(ContextServer::stdio(
+ id,
+ command,
+ working_directory,
+ )))
+ }
+ }
+ })??;
+
+ Ok((server, configuration))
+ }
+
+ async fn handle_get_context_server_command(
+ this: Entity<Self>,
+ envelope: TypedEnvelope<proto::GetContextServerCommand>,
+ mut cx: AsyncApp,
+ ) -> Result<proto::ContextServerCommand> {
+ let server_id = ContextServerId(envelope.payload.server_id.into());
+
+ let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
+ let ContextServerStoreState::Local {
+ is_headless: true, ..
+ } = &this.state
+ else {
+ anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
+ };
+
+ let settings = this
+ .context_server_settings
+ .get(&server_id.0)
+ .cloned()
+ .or_else(|| {
+ this.registry
+ .read(inner_cx)
+ .context_server_descriptor(&server_id.0)
+ .map(|_| ContextServerSettings::default_extension())
+ })
+ .with_context(|| format!("context server `{}` not found", server_id))?;
+
+ anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
+ })?;
+
+ let configuration = ContextServerConfiguration::from_settings(
+ settings,
+ server_id.clone(),
+ registry,
+ worktree_store,
+ &cx,
+ )
+ .await
+ .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
+
+ let command = configuration
+ .command()
+ .context("context server has no command (HTTP servers don't need RPC)")?;
+
+ Ok(proto::ContextServerCommand {
+ path: command.path.display().to_string(),
+ args: command.args.clone(),
+ env: command
+ .env
+ .clone()
+ .map(|env| env.into_iter().collect())
+ .unwrap_or_default(),
+ })
}
fn resolve_project_settings<'a>(
@@ -651,7 +865,7 @@ impl ContextServerStore {
worktree_store.clone(),
cx,
)
- .map(|config| (id, config))
+ .map(move |config| (id, config))
}))
.await
.into_iter()
@@ -662,7 +876,7 @@ impl ContextServerStore {
let mut servers_to_remove = HashSet::default();
let mut servers_to_stop = HashSet::default();
- this.update(cx, |this, cx| {
+ this.update(cx, |this, _cx| {
for server_id in this.servers.keys() {
// All servers that are not in desired_servers should be removed from the store.
// This can happen if the user removed a server from the context server settings.
@@ -681,8 +895,7 @@ impl ContextServerStore {
let existing_config = state.as_ref().map(|state| state.configuration());
if existing_config.as_deref() != Some(&config) || is_stopped {
let config = Arc::new(config);
- let server = this.create_context_server(id.clone(), config.clone(), cx)?;
- servers_to_start.push((server, config));
+ servers_to_start.push((id.clone(), config));
if this.servers.contains_key(&id) {
servers_to_stop.insert(id);
}
@@ -692,18 +905,25 @@ impl ContextServerStore {
anyhow::Ok(())
})??;
- this.update(cx, |this, cx| {
+ this.update(cx, |this, inner_cx| {
for id in servers_to_stop {
- this.stop_server(&id, cx)?;
+ this.stop_server(&id, inner_cx)?;
}
for id in servers_to_remove {
- this.remove_server(&id, cx)?;
- }
- for (server, config) in servers_to_start {
- this.run_server(server, config, cx);
+ this.remove_server(&id, inner_cx)?;
}
anyhow::Ok(())
- })?
+ })??;
+
+ for (id, config) in servers_to_start {
+ let (server, config) =
+ Self::create_context_server(this.clone(), id, config, cx).await?;
+ this.update(cx, |this, cx| {
+ this.run_server(server, config, cx);
+ })?;
+ }
+
+ Ok(())
}
}
@@ -733,7 +953,7 @@ mod tests {
ContextServerStore::test(
registry.clone(),
project.read(cx).worktree_store(),
- project.downgrade(),
+ Some(project.downgrade()),
cx,
)
});
@@ -807,7 +1027,7 @@ mod tests {
ContextServerStore::test(
registry.clone(),
project.read(cx).worktree_store(),
- project.downgrade(),
+ Some(project.downgrade()),
cx,
)
});
@@ -862,7 +1082,7 @@ mod tests {
ContextServerStore::test(
registry.clone(),
project.read(cx).worktree_store(),
- project.downgrade(),
+ Some(project.downgrade()),
cx,
)
});
@@ -942,6 +1162,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": true
}),
@@ -979,6 +1200,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": false
}),
@@ -998,6 +1220,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": false
}),
@@ -1025,6 +1248,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": false
}),
@@ -1034,6 +1258,7 @@ mod tests {
server_2_id.0.clone(),
settings::ContextServerSettingsContent::Stdio {
enabled: true,
+ remote: false,
command: ContextServerCommand {
path: "somebinary".into(),
args: vec!["arg".to_string()],
@@ -1066,6 +1291,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": false
}),
@@ -1075,6 +1301,7 @@ mod tests {
server_2_id.0.clone(),
settings::ContextServerSettingsContent::Stdio {
enabled: true,
+ remote: false,
command: ContextServerCommand {
path: "somebinary".into(),
args: vec!["anotherArg".to_string()],
@@ -1102,6 +1329,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": false
}),
@@ -1125,6 +1353,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Extension {
enabled: true,
+ remote: false,
settings: json!({
"somevalue": false
}),
@@ -1169,6 +1398,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Stdio {
enabled: true,
+ remote: false,
command: ContextServerCommand {
path: "somebinary".into(),
args: vec!["arg".to_string()],
@@ -1205,6 +1435,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Stdio {
enabled: false,
+ remote: false,
command: ContextServerCommand {
path: "somebinary".into(),
args: vec!["arg".to_string()],
@@ -1234,6 +1465,7 @@ mod tests {
server_1_id.0.clone(),
settings::ContextServerSettingsContent::Stdio {
enabled: true,
+ remote: false,
command: ContextServerCommand {
path: "somebinary".into(),
args: vec!["arg".to_string()],
@@ -1362,23 +1594,23 @@ mod tests {
ContextServerStore::test(
registry.clone(),
project.read(cx).worktree_store(),
- project.downgrade(),
+ Some(project.downgrade()),
cx,
)
});
- let result = store.update(cx, |store, cx| {
- store.create_context_server(
- ContextServerId("test-server".into()),
- Arc::new(ContextServerConfiguration::Http {
- url: url::Url::parse("http://localhost:8080")
- .expect("Failed to parse test URL"),
- headers: Default::default(),
- timeout: None,
- }),
- cx,
- )
- });
+ let mut async_cx = cx.to_async();
+ let result = ContextServerStore::create_context_server(
+ store.downgrade(),
+ ContextServerId("test-server".into()),
+ Arc::new(ContextServerConfiguration::Http {
+ url: url::Url::parse("http://localhost:8080").expect("Failed to parse test URL"),
+ headers: Default::default(),
+ timeout: None,
+ }),
+ &mut async_cx,
+ )
+ .await;
assert!(
result.is_ok(),
@@ -1420,23 +1652,23 @@ mod tests {
ContextServerStore::test(
registry.clone(),
project.read(cx).worktree_store(),
- project.downgrade(),
+ Some(project.downgrade()),
cx,
)
});
- let result = store.update(cx, |store, cx| {
- store.create_context_server(
- ContextServerId("test-server".into()),
- Arc::new(ContextServerConfiguration::Http {
- url: url::Url::parse("http://localhost:8080")
- .expect("Failed to parse test URL"),
- headers: Default::default(),
- timeout: Some(120),
- }),
- cx,
- )
- });
+ let mut async_cx = cx.to_async();
+ let result = ContextServerStore::create_context_server(
+ store.downgrade(),
+ ContextServerId("test-server".into()),
+ Arc::new(ContextServerConfiguration::Http {
+ url: url::Url::parse("http://localhost:8080").expect("Failed to parse test URL"),
+ headers: Default::default(),
+ timeout: Some(120),
+ }),
+ &mut async_cx,
+ )
+ .await;
assert!(
result.is_ok(),
@@ -1453,25 +1685,27 @@ mod tests {
ContextServerStore::test(
registry.clone(),
project.read(cx).worktree_store(),
- project.downgrade(),
+ Some(project.downgrade()),
cx,
)
});
- let result = store.update(cx, |store, cx| {
- store.create_context_server(
- ContextServerId("stdio-server".into()),
- Arc::new(ContextServerConfiguration::Custom {
- command: ContextServerCommand {
- path: "/usr/bin/node".into(),
- args: vec!["server.js".into()],
- env: None,
- timeout: Some(180000),
- },
- }),
- cx,
- )
- });
+ let mut async_cx = cx.to_async();
+ let result = ContextServerStore::create_context_server(
+ store.downgrade(),
+ ContextServerId("stdio-server".into()),
+ Arc::new(ContextServerConfiguration::Custom {
+ command: ContextServerCommand {
+ path: "/usr/bin/node".into(),
+ args: vec!["server.js".into()],
+ env: None,
+ timeout: Some(180000),
+ },
+ remote: false,
+ }),
+ &mut async_cx,
+ )
+ .await;
assert!(
result.is_ok(),