1pub mod extension;
2pub mod registry;
3
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context as _, Result};
9use collections::{HashMap, HashSet};
10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
11use futures::{FutureExt as _, future::join_all};
12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
13use itertools::Itertools;
14use registry::ContextServerDescriptorRegistry;
15use remote::RemoteClient;
16use rpc::{AnyProtoClient, TypedEnvelope, proto};
17use settings::{Settings as _, SettingsStore};
18use util::{ResultExt as _, rel_path::RelPath};
19
20use crate::{
21 DisableAiSettings, Project,
22 project_settings::{ContextServerSettings, ProjectSettings},
23 worktree_store::WorktreeStore,
24};
25
26/// Maximum timeout for context server requests
27/// Prevents extremely large timeout values from tying up resources indefinitely.
28const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
29
30pub fn init(cx: &mut App) {
31 extension::init(cx);
32}
33
34actions!(
35 context_server,
36 [
37 /// Restarts the context server.
38 Restart
39 ]
40);
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum ContextServerStatus {
44 Starting,
45 Running,
46 Stopped,
47 Error(Arc<str>),
48}
49
50impl ContextServerStatus {
51 fn from_state(state: &ContextServerState) -> Self {
52 match state {
53 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
54 ContextServerState::Running { .. } => ContextServerStatus::Running,
55 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
56 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
57 }
58 }
59}
60
61enum ContextServerState {
62 Starting {
63 server: Arc<ContextServer>,
64 configuration: Arc<ContextServerConfiguration>,
65 _task: Task<()>,
66 },
67 Running {
68 server: Arc<ContextServer>,
69 configuration: Arc<ContextServerConfiguration>,
70 },
71 Stopped {
72 server: Arc<ContextServer>,
73 configuration: Arc<ContextServerConfiguration>,
74 },
75 Error {
76 server: Arc<ContextServer>,
77 configuration: Arc<ContextServerConfiguration>,
78 error: Arc<str>,
79 },
80}
81
82impl ContextServerState {
83 pub fn server(&self) -> Arc<ContextServer> {
84 match self {
85 ContextServerState::Starting { server, .. } => server.clone(),
86 ContextServerState::Running { server, .. } => server.clone(),
87 ContextServerState::Stopped { server, .. } => server.clone(),
88 ContextServerState::Error { server, .. } => server.clone(),
89 }
90 }
91
92 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
93 match self {
94 ContextServerState::Starting { configuration, .. } => configuration.clone(),
95 ContextServerState::Running { configuration, .. } => configuration.clone(),
96 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
97 ContextServerState::Error { configuration, .. } => configuration.clone(),
98 }
99 }
100}
101
102#[derive(Debug, PartialEq, Eq)]
103pub enum ContextServerConfiguration {
104 Custom {
105 command: ContextServerCommand,
106 remote: bool,
107 },
108 Extension {
109 command: ContextServerCommand,
110 settings: serde_json::Value,
111 remote: bool,
112 },
113 Http {
114 url: url::Url,
115 headers: HashMap<String, String>,
116 timeout: Option<u64>,
117 },
118}
119
120impl ContextServerConfiguration {
121 pub fn command(&self) -> Option<&ContextServerCommand> {
122 match self {
123 ContextServerConfiguration::Custom { command, .. } => Some(command),
124 ContextServerConfiguration::Extension { command, .. } => Some(command),
125 ContextServerConfiguration::Http { .. } => None,
126 }
127 }
128
129 pub fn remote(&self) -> bool {
130 match self {
131 ContextServerConfiguration::Custom { remote, .. } => *remote,
132 ContextServerConfiguration::Extension { remote, .. } => *remote,
133 ContextServerConfiguration::Http { .. } => false,
134 }
135 }
136
137 pub async fn from_settings(
138 settings: ContextServerSettings,
139 id: ContextServerId,
140 registry: Entity<ContextServerDescriptorRegistry>,
141 worktree_store: Entity<WorktreeStore>,
142 cx: &AsyncApp,
143 ) -> Option<Self> {
144 match settings {
145 ContextServerSettings::Stdio {
146 enabled: _,
147 command,
148 remote,
149 } => Some(ContextServerConfiguration::Custom { command, remote }),
150 ContextServerSettings::Extension {
151 enabled: _,
152 settings,
153 remote,
154 } => {
155 let descriptor =
156 cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
157
158 match descriptor.command(worktree_store, cx).await {
159 Ok(command) => Some(ContextServerConfiguration::Extension {
160 command,
161 settings,
162 remote,
163 }),
164 Err(e) => {
165 log::error!(
166 "Failed to create context server configuration from settings: {e:#}"
167 );
168 None
169 }
170 }
171 }
172 ContextServerSettings::Http {
173 enabled: _,
174 url,
175 headers: auth,
176 timeout,
177 } => {
178 let url = url::Url::parse(&url).log_err()?;
179 Some(ContextServerConfiguration::Http {
180 url,
181 headers: auth,
182 timeout,
183 })
184 }
185 }
186 }
187}
188
189pub type ContextServerFactory =
190 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
191
192enum ContextServerStoreState {
193 Local {
194 downstream_client: Option<(u64, AnyProtoClient)>,
195 is_headless: bool,
196 },
197 Remote {
198 project_id: u64,
199 upstream_client: Entity<RemoteClient>,
200 },
201}
202
203pub struct ContextServerStore {
204 state: ContextServerStoreState,
205 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
206 servers: HashMap<ContextServerId, ContextServerState>,
207 server_ids: Vec<ContextServerId>,
208 worktree_store: Entity<WorktreeStore>,
209 project: Option<WeakEntity<Project>>,
210 registry: Entity<ContextServerDescriptorRegistry>,
211 update_servers_task: Option<Task<Result<()>>>,
212 context_server_factory: Option<ContextServerFactory>,
213 needs_server_update: bool,
214 ai_disabled: bool,
215 _subscriptions: Vec<Subscription>,
216}
217
218pub struct ServerStatusChangedEvent {
219 pub server_id: ContextServerId,
220 pub status: ContextServerStatus,
221}
222
223impl EventEmitter<ServerStatusChangedEvent> for ContextServerStore {}
224
225impl ContextServerStore {
226 pub fn local(
227 worktree_store: Entity<WorktreeStore>,
228 weak_project: Option<WeakEntity<Project>>,
229 headless: bool,
230 cx: &mut Context<Self>,
231 ) -> Self {
232 Self::new_internal(
233 !headless,
234 None,
235 ContextServerDescriptorRegistry::default_global(cx),
236 worktree_store,
237 weak_project,
238 ContextServerStoreState::Local {
239 downstream_client: None,
240 is_headless: headless,
241 },
242 cx,
243 )
244 }
245
246 pub fn remote(
247 project_id: u64,
248 upstream_client: Entity<RemoteClient>,
249 worktree_store: Entity<WorktreeStore>,
250 weak_project: Option<WeakEntity<Project>>,
251 cx: &mut Context<Self>,
252 ) -> Self {
253 Self::new_internal(
254 true,
255 None,
256 ContextServerDescriptorRegistry::default_global(cx),
257 worktree_store,
258 weak_project,
259 ContextServerStoreState::Remote {
260 project_id,
261 upstream_client,
262 },
263 cx,
264 )
265 }
266
267 pub fn init_headless(session: &AnyProtoClient) {
268 session.add_entity_request_handler(Self::handle_get_context_server_command);
269 }
270
271 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
272 if let ContextServerStoreState::Local {
273 downstream_client, ..
274 } = &mut self.state
275 {
276 *downstream_client = Some((project_id, client));
277 }
278 }
279
280 pub fn is_remote_project(&self) -> bool {
281 matches!(self.state, ContextServerStoreState::Remote { .. })
282 }
283
284 /// Returns all configured context server ids, excluding the ones that are disabled
285 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
286 self.context_server_settings
287 .iter()
288 .filter(|(_, settings)| settings.enabled())
289 .map(|(id, _)| ContextServerId(id.clone()))
290 .collect()
291 }
292
293 #[cfg(feature = "test-support")]
294 pub fn test(
295 registry: Entity<ContextServerDescriptorRegistry>,
296 worktree_store: Entity<WorktreeStore>,
297 weak_project: Option<WeakEntity<Project>>,
298 cx: &mut Context<Self>,
299 ) -> Self {
300 Self::new_internal(
301 false,
302 None,
303 registry,
304 worktree_store,
305 weak_project,
306 ContextServerStoreState::Local {
307 downstream_client: None,
308 is_headless: false,
309 },
310 cx,
311 )
312 }
313
314 #[cfg(feature = "test-support")]
315 pub fn test_maintain_server_loop(
316 context_server_factory: Option<ContextServerFactory>,
317 registry: Entity<ContextServerDescriptorRegistry>,
318 worktree_store: Entity<WorktreeStore>,
319 weak_project: Option<WeakEntity<Project>>,
320 cx: &mut Context<Self>,
321 ) -> Self {
322 Self::new_internal(
323 true,
324 context_server_factory,
325 registry,
326 worktree_store,
327 weak_project,
328 ContextServerStoreState::Local {
329 downstream_client: None,
330 is_headless: false,
331 },
332 cx,
333 )
334 }
335
336 #[cfg(feature = "test-support")]
337 pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
338 self.context_server_factory = Some(factory);
339 }
340
341 #[cfg(feature = "test-support")]
342 pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
343 &self.registry
344 }
345
346 #[cfg(feature = "test-support")]
347 pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
348 let configuration = Arc::new(ContextServerConfiguration::Custom {
349 command: ContextServerCommand {
350 path: "test".into(),
351 args: vec![],
352 env: None,
353 timeout: None,
354 },
355 remote: false,
356 });
357 self.run_server(server, configuration, cx);
358 }
359
360 fn new_internal(
361 maintain_server_loop: bool,
362 context_server_factory: Option<ContextServerFactory>,
363 registry: Entity<ContextServerDescriptorRegistry>,
364 worktree_store: Entity<WorktreeStore>,
365 weak_project: Option<WeakEntity<Project>>,
366 state: ContextServerStoreState,
367 cx: &mut Context<Self>,
368 ) -> Self {
369 let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
370 let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
371 let ai_was_disabled = this.ai_disabled;
372 this.ai_disabled = ai_disabled;
373
374 let settings =
375 &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
376 let settings_changed = &this.context_server_settings != settings;
377
378 if settings_changed {
379 this.context_server_settings = settings.clone();
380 }
381
382 // When AI is disabled, stop all running servers
383 if ai_disabled && maintain_server_loop {
384 let server_ids: Vec<_> = this.servers.keys().cloned().collect();
385 for id in server_ids {
386 this.stop_server(&id, cx).log_err();
387 }
388 return;
389 }
390
391 // Trigger updates if AI was re-enabled or settings changed
392 if maintain_server_loop && (ai_was_disabled || settings_changed) {
393 this.available_context_servers_changed(cx);
394 }
395 })];
396
397 if maintain_server_loop {
398 subscriptions.push(cx.observe(®istry, |this, _registry, cx| {
399 if !DisableAiSettings::get_global(cx).disable_ai {
400 this.available_context_servers_changed(cx);
401 }
402 }));
403 }
404
405 let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
406 let mut this = Self {
407 state,
408 _subscriptions: subscriptions,
409 context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
410 .context_servers
411 .clone(),
412 worktree_store,
413 project: weak_project,
414 registry,
415 needs_server_update: false,
416 ai_disabled,
417 servers: HashMap::default(),
418 server_ids: Default::default(),
419 update_servers_task: None,
420 context_server_factory,
421 };
422 if maintain_server_loop && !DisableAiSettings::get_global(cx).disable_ai {
423 this.available_context_servers_changed(cx);
424 }
425 this
426 }
427
428 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
429 self.servers.get(id).map(|state| state.server())
430 }
431
432 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
433 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
434 Some(server.clone())
435 } else {
436 None
437 }
438 }
439
440 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
441 self.servers.get(id).map(ContextServerStatus::from_state)
442 }
443
444 pub fn configuration_for_server(
445 &self,
446 id: &ContextServerId,
447 ) -> Option<Arc<ContextServerConfiguration>> {
448 self.servers.get(id).map(|state| state.configuration())
449 }
450
451 /// Returns a sorted slice of available unique context server IDs. Within the
452 /// slice, context servers which have `mcp-server-` as a prefix in their ID will
453 /// appear after servers that do not have this prefix in their ID.
454 pub fn server_ids(&self) -> &[ContextServerId] {
455 self.server_ids.as_slice()
456 }
457
458 fn populate_server_ids(&mut self, cx: &App) {
459 self.server_ids = self
460 .servers
461 .keys()
462 .cloned()
463 .chain(
464 self.registry
465 .read(cx)
466 .context_server_descriptors()
467 .into_iter()
468 .map(|(id, _)| ContextServerId(id)),
469 )
470 .chain(
471 self.context_server_settings
472 .keys()
473 .map(|id| ContextServerId(id.clone())),
474 )
475 .unique()
476 .sorted_unstable_by(
477 // Sort context servers: ones without mcp-server- prefix first, then prefixed ones
478 |a, b| {
479 const MCP_PREFIX: &str = "mcp-server-";
480 match (a.0.strip_prefix(MCP_PREFIX), b.0.strip_prefix(MCP_PREFIX)) {
481 // If one has mcp-server- prefix and other doesn't, non-mcp comes first
482 (Some(_), None) => std::cmp::Ordering::Greater,
483 (None, Some(_)) => std::cmp::Ordering::Less,
484 // If both have same prefix status, sort by appropriate key
485 (Some(a), Some(b)) => a.cmp(b),
486 (None, None) => a.0.cmp(&b.0),
487 }
488 },
489 )
490 .collect();
491 }
492
493 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
494 self.servers
495 .values()
496 .filter_map(|state| {
497 if let ContextServerState::Running { server, .. } = state {
498 Some(server.clone())
499 } else {
500 None
501 }
502 })
503 .collect()
504 }
505
506 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
507 cx.spawn(async move |this, cx| {
508 let this = this.upgrade().context("Context server store dropped")?;
509 let settings = this
510 .update(cx, |this, _| {
511 this.context_server_settings.get(&server.id().0).cloned()
512 })
513 .context("Failed to get context server settings")?;
514
515 if !settings.enabled() {
516 return anyhow::Ok(());
517 }
518
519 let (registry, worktree_store) = this.update(cx, |this, _| {
520 (this.registry.clone(), this.worktree_store.clone())
521 });
522 let configuration = ContextServerConfiguration::from_settings(
523 settings,
524 server.id(),
525 registry,
526 worktree_store,
527 cx,
528 )
529 .await
530 .context("Failed to create context server configuration")?;
531
532 this.update(cx, |this, cx| {
533 this.run_server(server, Arc::new(configuration), cx)
534 });
535 Ok(())
536 })
537 .detach_and_log_err(cx);
538 }
539
540 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
541 if matches!(
542 self.servers.get(id),
543 Some(ContextServerState::Stopped { .. })
544 ) {
545 return Ok(());
546 }
547
548 let state = self
549 .servers
550 .remove(id)
551 .context("Context server not found")?;
552
553 let server = state.server();
554 let configuration = state.configuration();
555 let mut result = Ok(());
556 if let ContextServerState::Running { server, .. } = &state {
557 result = server.stop();
558 }
559 drop(state);
560
561 self.update_server_state(
562 id.clone(),
563 ContextServerState::Stopped {
564 configuration,
565 server,
566 },
567 cx,
568 );
569
570 result
571 }
572
573 fn run_server(
574 &mut self,
575 server: Arc<ContextServer>,
576 configuration: Arc<ContextServerConfiguration>,
577 cx: &mut Context<Self>,
578 ) {
579 let id = server.id();
580 if matches!(
581 self.servers.get(&id),
582 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
583 ) {
584 self.stop_server(&id, cx).log_err();
585 }
586 let task = cx.spawn({
587 let id = server.id();
588 let server = server.clone();
589 let configuration = configuration.clone();
590
591 async move |this, cx| {
592 match server.clone().start(cx).await {
593 Ok(_) => {
594 debug_assert!(server.client().is_some());
595
596 this.update(cx, |this, cx| {
597 this.update_server_state(
598 id.clone(),
599 ContextServerState::Running {
600 server,
601 configuration,
602 },
603 cx,
604 )
605 })
606 .log_err()
607 }
608 Err(err) => {
609 log::error!("{} context server failed to start: {}", id, err);
610 this.update(cx, |this, cx| {
611 this.update_server_state(
612 id.clone(),
613 ContextServerState::Error {
614 configuration,
615 server,
616 error: err.to_string().into(),
617 },
618 cx,
619 )
620 })
621 .log_err()
622 }
623 };
624 }
625 });
626
627 self.update_server_state(
628 id.clone(),
629 ContextServerState::Starting {
630 configuration,
631 _task: task,
632 server,
633 },
634 cx,
635 );
636 }
637
638 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
639 let state = self
640 .servers
641 .remove(id)
642 .context("Context server not found")?;
643 drop(state);
644 cx.emit(ServerStatusChangedEvent {
645 server_id: id.clone(),
646 status: ContextServerStatus::Stopped,
647 });
648 Ok(())
649 }
650
651 pub async fn create_context_server(
652 this: WeakEntity<Self>,
653 id: ContextServerId,
654 configuration: Arc<ContextServerConfiguration>,
655 cx: &mut AsyncApp,
656 ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
657 let remote = configuration.remote();
658 let needs_remote_command = match configuration.as_ref() {
659 ContextServerConfiguration::Custom { .. }
660 | ContextServerConfiguration::Extension { .. } => remote,
661 ContextServerConfiguration::Http { .. } => false,
662 };
663
664 let (remote_state, is_remote_project) = this.update(cx, |this, _| {
665 let remote_state = match &this.state {
666 ContextServerStoreState::Remote {
667 project_id,
668 upstream_client,
669 } if needs_remote_command => Some((*project_id, upstream_client.clone())),
670 _ => None,
671 };
672 (remote_state, this.is_remote_project())
673 })?;
674
675 let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
676 this.project
677 .as_ref()
678 .and_then(|project| {
679 project
680 .read_with(cx, |project, cx| project.active_project_directory(cx))
681 .ok()
682 .flatten()
683 })
684 .or_else(|| {
685 this.worktree_store.read_with(cx, |store, cx| {
686 store.visible_worktrees(cx).fold(None, |acc, item| {
687 if acc.is_none() {
688 item.read(cx).root_dir()
689 } else {
690 acc
691 }
692 })
693 })
694 })
695 })?;
696
697 let configuration = if let Some((project_id, upstream_client)) = remote_state {
698 let root_dir = root_path.as_ref().map(|p| p.display().to_string());
699
700 let response = upstream_client
701 .update(cx, |client, _| {
702 client
703 .proto_client()
704 .request(proto::GetContextServerCommand {
705 project_id,
706 server_id: id.0.to_string(),
707 root_dir: root_dir.clone(),
708 })
709 })
710 .await?;
711
712 let remote_command = upstream_client.update(cx, |client, _| {
713 client.build_command(
714 Some(response.path),
715 &response.args,
716 &response.env.into_iter().collect(),
717 root_dir,
718 None,
719 )
720 })?;
721
722 let command = ContextServerCommand {
723 path: remote_command.program.into(),
724 args: remote_command.args,
725 env: Some(remote_command.env.into_iter().collect()),
726 timeout: None,
727 };
728
729 Arc::new(ContextServerConfiguration::Custom { command, remote })
730 } else {
731 configuration
732 };
733
734 let server: Arc<ContextServer> = this.update(cx, |this, cx| {
735 let global_timeout =
736 Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
737
738 if let Some(factory) = this.context_server_factory.as_ref() {
739 return anyhow::Ok(factory(id.clone(), configuration.clone()));
740 }
741
742 match configuration.as_ref() {
743 ContextServerConfiguration::Http {
744 url,
745 headers,
746 timeout,
747 } => anyhow::Ok(Arc::new(ContextServer::http(
748 id,
749 url,
750 headers.clone(),
751 cx.http_client(),
752 cx.background_executor().clone(),
753 Some(Duration::from_secs(
754 timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
755 )),
756 )?)),
757 _ => {
758 let mut command = configuration
759 .command()
760 .context("Missing command configuration for stdio context server")?
761 .clone();
762 command.timeout = Some(
763 command
764 .timeout
765 .unwrap_or(global_timeout)
766 .min(MAX_TIMEOUT_SECS),
767 );
768
769 // Don't pass remote paths as working directory for locally-spawned processes
770 let working_directory = if is_remote_project { None } else { root_path };
771 anyhow::Ok(Arc::new(ContextServer::stdio(
772 id,
773 command,
774 working_directory,
775 )))
776 }
777 }
778 })??;
779
780 Ok((server, configuration))
781 }
782
783 async fn handle_get_context_server_command(
784 this: Entity<Self>,
785 envelope: TypedEnvelope<proto::GetContextServerCommand>,
786 mut cx: AsyncApp,
787 ) -> Result<proto::ContextServerCommand> {
788 let server_id = ContextServerId(envelope.payload.server_id.into());
789
790 let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
791 let ContextServerStoreState::Local {
792 is_headless: true, ..
793 } = &this.state
794 else {
795 anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
796 };
797
798 let settings = this
799 .context_server_settings
800 .get(&server_id.0)
801 .cloned()
802 .or_else(|| {
803 this.registry
804 .read(inner_cx)
805 .context_server_descriptor(&server_id.0)
806 .map(|_| ContextServerSettings::default_extension())
807 })
808 .with_context(|| format!("context server `{}` not found", server_id))?;
809
810 anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
811 })?;
812
813 let configuration = ContextServerConfiguration::from_settings(
814 settings,
815 server_id.clone(),
816 registry,
817 worktree_store,
818 &cx,
819 )
820 .await
821 .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
822
823 let command = configuration
824 .command()
825 .context("context server has no command (HTTP servers don't need RPC)")?;
826
827 Ok(proto::ContextServerCommand {
828 path: command.path.display().to_string(),
829 args: command.args.clone(),
830 env: command
831 .env
832 .clone()
833 .map(|env| env.into_iter().collect())
834 .unwrap_or_default(),
835 })
836 }
837
838 fn resolve_project_settings<'a>(
839 worktree_store: &'a Entity<WorktreeStore>,
840 cx: &'a App,
841 ) -> &'a ProjectSettings {
842 let location = worktree_store
843 .read(cx)
844 .visible_worktrees(cx)
845 .next()
846 .map(|worktree| settings::SettingsLocation {
847 worktree_id: worktree.read(cx).id(),
848 path: RelPath::empty(),
849 });
850 ProjectSettings::get(location, cx)
851 }
852
853 fn update_server_state(
854 &mut self,
855 id: ContextServerId,
856 state: ContextServerState,
857 cx: &mut Context<Self>,
858 ) {
859 let status = ContextServerStatus::from_state(&state);
860 self.servers.insert(id.clone(), state);
861 cx.emit(ServerStatusChangedEvent {
862 server_id: id,
863 status,
864 });
865 }
866
867 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
868 if self.update_servers_task.is_some() {
869 self.needs_server_update = true;
870 } else {
871 self.needs_server_update = false;
872 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
873 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
874 log::error!("Error maintaining context servers: {}", err);
875 }
876
877 this.update(cx, |this, cx| {
878 this.populate_server_ids(cx);
879 this.update_servers_task.take();
880 if this.needs_server_update {
881 this.available_context_servers_changed(cx);
882 }
883 })?;
884
885 Ok(())
886 }));
887 }
888 }
889
890 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
891 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
892 (
893 this.context_server_settings.clone(),
894 this.registry.clone(),
895 this.worktree_store.clone(),
896 )
897 })?;
898
899 for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
900 configured_servers
901 .entry(id)
902 .or_insert(ContextServerSettings::default_extension());
903 }
904
905 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
906 configured_servers
907 .into_iter()
908 .partition(|(_, settings)| settings.enabled());
909
910 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
911 let id = ContextServerId(id);
912 ContextServerConfiguration::from_settings(
913 settings,
914 id.clone(),
915 registry.clone(),
916 worktree_store.clone(),
917 cx,
918 )
919 .map(move |config| (id, config))
920 }))
921 .await
922 .into_iter()
923 .filter_map(|(id, config)| config.map(|config| (id, config)))
924 .collect::<HashMap<_, _>>();
925
926 let mut servers_to_start = Vec::new();
927 let mut servers_to_remove = HashSet::default();
928 let mut servers_to_stop = HashSet::default();
929
930 this.update(cx, |this, _cx| {
931 for server_id in this.servers.keys() {
932 // All servers that are not in desired_servers should be removed from the store.
933 // This can happen if the user removed a server from the context server settings.
934 if !configured_servers.contains_key(server_id) {
935 if disabled_servers.contains_key(&server_id.0) {
936 servers_to_stop.insert(server_id.clone());
937 } else {
938 servers_to_remove.insert(server_id.clone());
939 }
940 }
941 }
942
943 for (id, config) in configured_servers {
944 let state = this.servers.get(&id);
945 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
946 let existing_config = state.as_ref().map(|state| state.configuration());
947 if existing_config.as_deref() != Some(&config) || is_stopped {
948 let config = Arc::new(config);
949 servers_to_start.push((id.clone(), config));
950 if this.servers.contains_key(&id) {
951 servers_to_stop.insert(id);
952 }
953 }
954 }
955
956 anyhow::Ok(())
957 })??;
958
959 this.update(cx, |this, inner_cx| {
960 for id in servers_to_stop {
961 this.stop_server(&id, inner_cx)?;
962 }
963 for id in servers_to_remove {
964 this.remove_server(&id, inner_cx)?;
965 }
966 anyhow::Ok(())
967 })??;
968
969 for (id, config) in servers_to_start {
970 let (server, config) =
971 Self::create_context_server(this.clone(), id, config, cx).await?;
972 this.update(cx, |this, cx| {
973 this.run_server(server, config, cx);
974 })?;
975 }
976
977 Ok(())
978 }
979}