1pub mod extension;
2pub mod registry;
3
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context as _, Result};
9use collections::{HashMap, HashSet};
10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
11use futures::{FutureExt as _, future::Either, future::join_all};
12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
13use itertools::Itertools;
14use registry::ContextServerDescriptorRegistry;
15use remote::RemoteClient;
16use rpc::{AnyProtoClient, TypedEnvelope, proto};
17use settings::{Settings as _, SettingsStore};
18use util::{ResultExt as _, rel_path::RelPath};
19
20use crate::{
21 DisableAiSettings, Project,
22 project_settings::{ContextServerSettings, ProjectSettings},
23 worktree_store::WorktreeStore,
24};
25
26/// Maximum timeout for context server requests
27/// Prevents extremely large timeout values from tying up resources indefinitely.
28const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
29
30pub fn init(cx: &mut App) {
31 extension::init(cx);
32}
33
34actions!(
35 context_server,
36 [
37 /// Restarts the context server.
38 Restart
39 ]
40);
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum ContextServerStatus {
44 Starting,
45 Running,
46 Stopped,
47 Error(Arc<str>),
48}
49
50impl ContextServerStatus {
51 fn from_state(state: &ContextServerState) -> Self {
52 match state {
53 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
54 ContextServerState::Running { .. } => ContextServerStatus::Running,
55 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
56 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
57 }
58 }
59}
60
61enum ContextServerState {
62 Starting {
63 server: Arc<ContextServer>,
64 configuration: Arc<ContextServerConfiguration>,
65 _task: Task<()>,
66 },
67 Running {
68 server: Arc<ContextServer>,
69 configuration: Arc<ContextServerConfiguration>,
70 },
71 Stopped {
72 server: Arc<ContextServer>,
73 configuration: Arc<ContextServerConfiguration>,
74 },
75 Error {
76 server: Arc<ContextServer>,
77 configuration: Arc<ContextServerConfiguration>,
78 error: Arc<str>,
79 },
80}
81
82impl ContextServerState {
83 pub fn server(&self) -> Arc<ContextServer> {
84 match self {
85 ContextServerState::Starting { server, .. } => server.clone(),
86 ContextServerState::Running { server, .. } => server.clone(),
87 ContextServerState::Stopped { server, .. } => server.clone(),
88 ContextServerState::Error { server, .. } => server.clone(),
89 }
90 }
91
92 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
93 match self {
94 ContextServerState::Starting { configuration, .. } => configuration.clone(),
95 ContextServerState::Running { configuration, .. } => configuration.clone(),
96 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
97 ContextServerState::Error { configuration, .. } => configuration.clone(),
98 }
99 }
100}
101
102#[derive(Debug, PartialEq, Eq)]
103pub enum ContextServerConfiguration {
104 Custom {
105 command: ContextServerCommand,
106 remote: bool,
107 },
108 Extension {
109 command: ContextServerCommand,
110 settings: serde_json::Value,
111 remote: bool,
112 },
113 Http {
114 url: url::Url,
115 headers: HashMap<String, String>,
116 timeout: Option<u64>,
117 },
118}
119
120impl ContextServerConfiguration {
121 pub fn command(&self) -> Option<&ContextServerCommand> {
122 match self {
123 ContextServerConfiguration::Custom { command, .. } => Some(command),
124 ContextServerConfiguration::Extension { command, .. } => Some(command),
125 ContextServerConfiguration::Http { .. } => None,
126 }
127 }
128
129 pub fn remote(&self) -> bool {
130 match self {
131 ContextServerConfiguration::Custom { remote, .. } => *remote,
132 ContextServerConfiguration::Extension { remote, .. } => *remote,
133 ContextServerConfiguration::Http { .. } => false,
134 }
135 }
136
137 pub async fn from_settings(
138 settings: ContextServerSettings,
139 id: ContextServerId,
140 registry: Entity<ContextServerDescriptorRegistry>,
141 worktree_store: Entity<WorktreeStore>,
142 cx: &AsyncApp,
143 ) -> Option<Self> {
144 const EXTENSION_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);
145
146 match settings {
147 ContextServerSettings::Stdio {
148 enabled: _,
149 command,
150 remote,
151 } => Some(ContextServerConfiguration::Custom { command, remote }),
152 ContextServerSettings::Extension {
153 enabled: _,
154 settings,
155 remote,
156 } => {
157 let descriptor =
158 cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
159
160 let command_future = descriptor.command(worktree_store, cx);
161 let timeout_future = cx.background_executor().timer(EXTENSION_COMMAND_TIMEOUT);
162
163 match futures::future::select(command_future, timeout_future).await {
164 Either::Left((Ok(command), _)) => Some(ContextServerConfiguration::Extension {
165 command,
166 settings,
167 remote,
168 }),
169 Either::Left((Err(e), _)) => {
170 log::error!(
171 "Failed to create context server configuration from settings: {e:#}"
172 );
173 None
174 }
175 Either::Right(_) => {
176 log::error!(
177 "Timed out resolving command for extension context server {id}"
178 );
179 None
180 }
181 }
182 }
183 ContextServerSettings::Http {
184 enabled: _,
185 url,
186 headers: auth,
187 timeout,
188 } => {
189 let url = url::Url::parse(&url).log_err()?;
190 Some(ContextServerConfiguration::Http {
191 url,
192 headers: auth,
193 timeout,
194 })
195 }
196 }
197 }
198}
199
200pub type ContextServerFactory =
201 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
202
203enum ContextServerStoreState {
204 Local {
205 downstream_client: Option<(u64, AnyProtoClient)>,
206 is_headless: bool,
207 },
208 Remote {
209 project_id: u64,
210 upstream_client: Entity<RemoteClient>,
211 },
212}
213
214pub struct ContextServerStore {
215 state: ContextServerStoreState,
216 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
217 servers: HashMap<ContextServerId, ContextServerState>,
218 server_ids: Vec<ContextServerId>,
219 worktree_store: Entity<WorktreeStore>,
220 project: Option<WeakEntity<Project>>,
221 registry: Entity<ContextServerDescriptorRegistry>,
222 update_servers_task: Option<Task<Result<()>>>,
223 context_server_factory: Option<ContextServerFactory>,
224 needs_server_update: bool,
225 ai_disabled: bool,
226 _subscriptions: Vec<Subscription>,
227}
228
229pub struct ServerStatusChangedEvent {
230 pub server_id: ContextServerId,
231 pub status: ContextServerStatus,
232}
233
234impl EventEmitter<ServerStatusChangedEvent> for ContextServerStore {}
235
236impl ContextServerStore {
237 pub fn local(
238 worktree_store: Entity<WorktreeStore>,
239 weak_project: Option<WeakEntity<Project>>,
240 headless: bool,
241 cx: &mut Context<Self>,
242 ) -> Self {
243 Self::new_internal(
244 !headless,
245 None,
246 ContextServerDescriptorRegistry::default_global(cx),
247 worktree_store,
248 weak_project,
249 ContextServerStoreState::Local {
250 downstream_client: None,
251 is_headless: headless,
252 },
253 cx,
254 )
255 }
256
257 pub fn remote(
258 project_id: u64,
259 upstream_client: Entity<RemoteClient>,
260 worktree_store: Entity<WorktreeStore>,
261 weak_project: Option<WeakEntity<Project>>,
262 cx: &mut Context<Self>,
263 ) -> Self {
264 Self::new_internal(
265 true,
266 None,
267 ContextServerDescriptorRegistry::default_global(cx),
268 worktree_store,
269 weak_project,
270 ContextServerStoreState::Remote {
271 project_id,
272 upstream_client,
273 },
274 cx,
275 )
276 }
277
278 pub fn init_headless(session: &AnyProtoClient) {
279 session.add_entity_request_handler(Self::handle_get_context_server_command);
280 }
281
282 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
283 if let ContextServerStoreState::Local {
284 downstream_client, ..
285 } = &mut self.state
286 {
287 *downstream_client = Some((project_id, client));
288 }
289 }
290
291 pub fn is_remote_project(&self) -> bool {
292 matches!(self.state, ContextServerStoreState::Remote { .. })
293 }
294
295 /// Returns all configured context server ids, excluding the ones that are disabled
296 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
297 self.context_server_settings
298 .iter()
299 .filter(|(_, settings)| settings.enabled())
300 .map(|(id, _)| ContextServerId(id.clone()))
301 .collect()
302 }
303
304 #[cfg(feature = "test-support")]
305 pub fn test(
306 registry: Entity<ContextServerDescriptorRegistry>,
307 worktree_store: Entity<WorktreeStore>,
308 weak_project: Option<WeakEntity<Project>>,
309 cx: &mut Context<Self>,
310 ) -> Self {
311 Self::new_internal(
312 false,
313 None,
314 registry,
315 worktree_store,
316 weak_project,
317 ContextServerStoreState::Local {
318 downstream_client: None,
319 is_headless: false,
320 },
321 cx,
322 )
323 }
324
325 #[cfg(feature = "test-support")]
326 pub fn test_maintain_server_loop(
327 context_server_factory: Option<ContextServerFactory>,
328 registry: Entity<ContextServerDescriptorRegistry>,
329 worktree_store: Entity<WorktreeStore>,
330 weak_project: Option<WeakEntity<Project>>,
331 cx: &mut Context<Self>,
332 ) -> Self {
333 Self::new_internal(
334 true,
335 context_server_factory,
336 registry,
337 worktree_store,
338 weak_project,
339 ContextServerStoreState::Local {
340 downstream_client: None,
341 is_headless: false,
342 },
343 cx,
344 )
345 }
346
347 #[cfg(feature = "test-support")]
348 pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
349 self.context_server_factory = Some(factory);
350 }
351
352 #[cfg(feature = "test-support")]
353 pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
354 &self.registry
355 }
356
357 #[cfg(feature = "test-support")]
358 pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
359 let configuration = Arc::new(ContextServerConfiguration::Custom {
360 command: ContextServerCommand {
361 path: "test".into(),
362 args: vec![],
363 env: None,
364 timeout: None,
365 },
366 remote: false,
367 });
368 self.run_server(server, configuration, cx);
369 }
370
371 fn new_internal(
372 maintain_server_loop: bool,
373 context_server_factory: Option<ContextServerFactory>,
374 registry: Entity<ContextServerDescriptorRegistry>,
375 worktree_store: Entity<WorktreeStore>,
376 weak_project: Option<WeakEntity<Project>>,
377 state: ContextServerStoreState,
378 cx: &mut Context<Self>,
379 ) -> Self {
380 let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
381 let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
382 let ai_was_disabled = this.ai_disabled;
383 this.ai_disabled = ai_disabled;
384
385 let settings =
386 &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
387 let settings_changed = &this.context_server_settings != settings;
388
389 if settings_changed {
390 this.context_server_settings = settings.clone();
391 }
392
393 // When AI is disabled, stop all running servers
394 if ai_disabled {
395 let server_ids: Vec<_> = this.servers.keys().cloned().collect();
396 for id in server_ids {
397 this.stop_server(&id, cx).log_err();
398 }
399 return;
400 }
401
402 // Trigger updates if AI was re-enabled or settings changed
403 if maintain_server_loop && (ai_was_disabled || settings_changed) {
404 this.available_context_servers_changed(cx);
405 }
406 })];
407
408 if maintain_server_loop {
409 subscriptions.push(cx.observe(®istry, |this, _registry, cx| {
410 if !DisableAiSettings::get_global(cx).disable_ai {
411 this.available_context_servers_changed(cx);
412 }
413 }));
414 }
415
416 let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
417 let mut this = Self {
418 state,
419 _subscriptions: subscriptions,
420 context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
421 .context_servers
422 .clone(),
423 worktree_store,
424 project: weak_project,
425 registry,
426 needs_server_update: false,
427 ai_disabled,
428 servers: HashMap::default(),
429 server_ids: Default::default(),
430 update_servers_task: None,
431 context_server_factory,
432 };
433 if maintain_server_loop && !DisableAiSettings::get_global(cx).disable_ai {
434 this.available_context_servers_changed(cx);
435 }
436 this
437 }
438
439 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
440 self.servers.get(id).map(|state| state.server())
441 }
442
443 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
444 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
445 Some(server.clone())
446 } else {
447 None
448 }
449 }
450
451 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
452 self.servers.get(id).map(ContextServerStatus::from_state)
453 }
454
455 pub fn configuration_for_server(
456 &self,
457 id: &ContextServerId,
458 ) -> Option<Arc<ContextServerConfiguration>> {
459 self.servers.get(id).map(|state| state.configuration())
460 }
461
462 /// Returns a sorted slice of available unique context server IDs. Within the
463 /// slice, context servers which have `mcp-server-` as a prefix in their ID will
464 /// appear after servers that do not have this prefix in their ID.
465 pub fn server_ids(&self) -> &[ContextServerId] {
466 self.server_ids.as_slice()
467 }
468
469 fn populate_server_ids(&mut self, cx: &App) {
470 self.server_ids = self
471 .servers
472 .keys()
473 .cloned()
474 .chain(
475 self.registry
476 .read(cx)
477 .context_server_descriptors()
478 .into_iter()
479 .map(|(id, _)| ContextServerId(id)),
480 )
481 .chain(
482 self.context_server_settings
483 .keys()
484 .map(|id| ContextServerId(id.clone())),
485 )
486 .unique()
487 .sorted_unstable_by(
488 // Sort context servers: ones without mcp-server- prefix first, then prefixed ones
489 |a, b| {
490 const MCP_PREFIX: &str = "mcp-server-";
491 match (a.0.strip_prefix(MCP_PREFIX), b.0.strip_prefix(MCP_PREFIX)) {
492 // If one has mcp-server- prefix and other doesn't, non-mcp comes first
493 (Some(_), None) => std::cmp::Ordering::Greater,
494 (None, Some(_)) => std::cmp::Ordering::Less,
495 // If both have same prefix status, sort by appropriate key
496 (Some(a), Some(b)) => a.cmp(b),
497 (None, None) => a.0.cmp(&b.0),
498 }
499 },
500 )
501 .collect();
502 }
503
504 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
505 self.servers
506 .values()
507 .filter_map(|state| {
508 if let ContextServerState::Running { server, .. } = state {
509 Some(server.clone())
510 } else {
511 None
512 }
513 })
514 .collect()
515 }
516
517 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
518 cx.spawn(async move |this, cx| {
519 let this = this.upgrade().context("Context server store dropped")?;
520 let settings = this
521 .update(cx, |this, _| {
522 this.context_server_settings.get(&server.id().0).cloned()
523 })
524 .context("Failed to get context server settings")?;
525
526 if !settings.enabled() {
527 return anyhow::Ok(());
528 }
529
530 let (registry, worktree_store) = this.update(cx, |this, _| {
531 (this.registry.clone(), this.worktree_store.clone())
532 });
533 let configuration = ContextServerConfiguration::from_settings(
534 settings,
535 server.id(),
536 registry,
537 worktree_store,
538 cx,
539 )
540 .await
541 .context("Failed to create context server configuration")?;
542
543 this.update(cx, |this, cx| {
544 this.run_server(server, Arc::new(configuration), cx)
545 });
546 Ok(())
547 })
548 .detach_and_log_err(cx);
549 }
550
551 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
552 if matches!(
553 self.servers.get(id),
554 Some(ContextServerState::Stopped { .. })
555 ) {
556 return Ok(());
557 }
558
559 let state = self
560 .servers
561 .remove(id)
562 .context("Context server not found")?;
563
564 let server = state.server();
565 let configuration = state.configuration();
566 let mut result = Ok(());
567 if let ContextServerState::Running { server, .. } = &state {
568 result = server.stop();
569 }
570 drop(state);
571
572 self.update_server_state(
573 id.clone(),
574 ContextServerState::Stopped {
575 configuration,
576 server,
577 },
578 cx,
579 );
580
581 result
582 }
583
584 fn run_server(
585 &mut self,
586 server: Arc<ContextServer>,
587 configuration: Arc<ContextServerConfiguration>,
588 cx: &mut Context<Self>,
589 ) {
590 let id = server.id();
591 if matches!(
592 self.servers.get(&id),
593 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
594 ) {
595 self.stop_server(&id, cx).log_err();
596 }
597 let task = cx.spawn({
598 let id = server.id();
599 let server = server.clone();
600 let configuration = configuration.clone();
601
602 async move |this, cx| {
603 match server.clone().start(cx).await {
604 Ok(_) => {
605 debug_assert!(server.client().is_some());
606
607 this.update(cx, |this, cx| {
608 this.update_server_state(
609 id.clone(),
610 ContextServerState::Running {
611 server,
612 configuration,
613 },
614 cx,
615 )
616 })
617 .log_err()
618 }
619 Err(err) => {
620 log::error!("{} context server failed to start: {}", id, err);
621 this.update(cx, |this, cx| {
622 this.update_server_state(
623 id.clone(),
624 ContextServerState::Error {
625 configuration,
626 server,
627 error: err.to_string().into(),
628 },
629 cx,
630 )
631 })
632 .log_err()
633 }
634 };
635 }
636 });
637
638 self.update_server_state(
639 id.clone(),
640 ContextServerState::Starting {
641 configuration,
642 _task: task,
643 server,
644 },
645 cx,
646 );
647 }
648
649 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
650 let state = self
651 .servers
652 .remove(id)
653 .context("Context server not found")?;
654 drop(state);
655 cx.emit(ServerStatusChangedEvent {
656 server_id: id.clone(),
657 status: ContextServerStatus::Stopped,
658 });
659 Ok(())
660 }
661
662 pub async fn create_context_server(
663 this: WeakEntity<Self>,
664 id: ContextServerId,
665 configuration: Arc<ContextServerConfiguration>,
666 cx: &mut AsyncApp,
667 ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
668 let remote = configuration.remote();
669 let needs_remote_command = match configuration.as_ref() {
670 ContextServerConfiguration::Custom { .. }
671 | ContextServerConfiguration::Extension { .. } => remote,
672 ContextServerConfiguration::Http { .. } => false,
673 };
674
675 let (remote_state, is_remote_project) = this.update(cx, |this, _| {
676 let remote_state = match &this.state {
677 ContextServerStoreState::Remote {
678 project_id,
679 upstream_client,
680 } if needs_remote_command => Some((*project_id, upstream_client.clone())),
681 _ => None,
682 };
683 (remote_state, this.is_remote_project())
684 })?;
685
686 let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
687 this.project
688 .as_ref()
689 .and_then(|project| {
690 project
691 .read_with(cx, |project, cx| project.active_project_directory(cx))
692 .ok()
693 .flatten()
694 })
695 .or_else(|| {
696 this.worktree_store.read_with(cx, |store, cx| {
697 store.visible_worktrees(cx).fold(None, |acc, item| {
698 if acc.is_none() {
699 item.read(cx).root_dir()
700 } else {
701 acc
702 }
703 })
704 })
705 })
706 })?;
707
708 let configuration = if let Some((project_id, upstream_client)) = remote_state {
709 let root_dir = root_path.as_ref().map(|p| p.display().to_string());
710
711 let response = upstream_client
712 .update(cx, |client, _| {
713 client
714 .proto_client()
715 .request(proto::GetContextServerCommand {
716 project_id,
717 server_id: id.0.to_string(),
718 root_dir: root_dir.clone(),
719 })
720 })
721 .await?;
722
723 let remote_command = upstream_client.update(cx, |client, _| {
724 client.build_command(
725 Some(response.path),
726 &response.args,
727 &response.env.into_iter().collect(),
728 root_dir,
729 None,
730 )
731 })?;
732
733 let command = ContextServerCommand {
734 path: remote_command.program.into(),
735 args: remote_command.args,
736 env: Some(remote_command.env.into_iter().collect()),
737 timeout: None,
738 };
739
740 Arc::new(ContextServerConfiguration::Custom { command, remote })
741 } else {
742 configuration
743 };
744
745 let server: Arc<ContextServer> = this.update(cx, |this, cx| {
746 let global_timeout =
747 Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
748
749 if let Some(factory) = this.context_server_factory.as_ref() {
750 return anyhow::Ok(factory(id.clone(), configuration.clone()));
751 }
752
753 match configuration.as_ref() {
754 ContextServerConfiguration::Http {
755 url,
756 headers,
757 timeout,
758 } => anyhow::Ok(Arc::new(ContextServer::http(
759 id,
760 url,
761 headers.clone(),
762 cx.http_client(),
763 cx.background_executor().clone(),
764 Some(Duration::from_secs(
765 timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
766 )),
767 )?)),
768 _ => {
769 let mut command = configuration
770 .command()
771 .context("Missing command configuration for stdio context server")?
772 .clone();
773 command.timeout = Some(
774 command
775 .timeout
776 .unwrap_or(global_timeout)
777 .min(MAX_TIMEOUT_SECS),
778 );
779
780 // Don't pass remote paths as working directory for locally-spawned processes
781 let working_directory = if is_remote_project { None } else { root_path };
782 anyhow::Ok(Arc::new(ContextServer::stdio(
783 id,
784 command,
785 working_directory,
786 )))
787 }
788 }
789 })??;
790
791 Ok((server, configuration))
792 }
793
794 async fn handle_get_context_server_command(
795 this: Entity<Self>,
796 envelope: TypedEnvelope<proto::GetContextServerCommand>,
797 mut cx: AsyncApp,
798 ) -> Result<proto::ContextServerCommand> {
799 let server_id = ContextServerId(envelope.payload.server_id.into());
800
801 let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
802 let ContextServerStoreState::Local {
803 is_headless: true, ..
804 } = &this.state
805 else {
806 anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
807 };
808
809 let settings = this
810 .context_server_settings
811 .get(&server_id.0)
812 .cloned()
813 .or_else(|| {
814 this.registry
815 .read(inner_cx)
816 .context_server_descriptor(&server_id.0)
817 .map(|_| ContextServerSettings::default_extension())
818 })
819 .with_context(|| format!("context server `{}` not found", server_id))?;
820
821 anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
822 })?;
823
824 let configuration = ContextServerConfiguration::from_settings(
825 settings,
826 server_id.clone(),
827 registry,
828 worktree_store,
829 &cx,
830 )
831 .await
832 .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
833
834 let command = configuration
835 .command()
836 .context("context server has no command (HTTP servers don't need RPC)")?;
837
838 Ok(proto::ContextServerCommand {
839 path: command.path.display().to_string(),
840 args: command.args.clone(),
841 env: command
842 .env
843 .clone()
844 .map(|env| env.into_iter().collect())
845 .unwrap_or_default(),
846 })
847 }
848
849 fn resolve_project_settings<'a>(
850 worktree_store: &'a Entity<WorktreeStore>,
851 cx: &'a App,
852 ) -> &'a ProjectSettings {
853 let location = worktree_store
854 .read(cx)
855 .visible_worktrees(cx)
856 .next()
857 .map(|worktree| settings::SettingsLocation {
858 worktree_id: worktree.read(cx).id(),
859 path: RelPath::empty(),
860 });
861 ProjectSettings::get(location, cx)
862 }
863
864 fn update_server_state(
865 &mut self,
866 id: ContextServerId,
867 state: ContextServerState,
868 cx: &mut Context<Self>,
869 ) {
870 let status = ContextServerStatus::from_state(&state);
871 self.servers.insert(id.clone(), state);
872 cx.emit(ServerStatusChangedEvent {
873 server_id: id,
874 status,
875 });
876 }
877
878 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
879 if self.update_servers_task.is_some() {
880 self.needs_server_update = true;
881 } else {
882 self.needs_server_update = false;
883 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
884 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
885 log::error!("Error maintaining context servers: {}", err);
886 }
887
888 this.update(cx, |this, cx| {
889 this.populate_server_ids(cx);
890 cx.notify();
891 this.update_servers_task.take();
892 if this.needs_server_update {
893 this.available_context_servers_changed(cx);
894 }
895 })?;
896
897 Ok(())
898 }));
899 }
900 }
901
902 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
903 // Don't start context servers if AI is disabled
904 let ai_disabled = this.update(cx, |_, cx| DisableAiSettings::get_global(cx).disable_ai)?;
905 if ai_disabled {
906 // Stop all running servers when AI is disabled
907 this.update(cx, |this, cx| {
908 let server_ids: Vec<_> = this.servers.keys().cloned().collect();
909 for id in server_ids {
910 let _ = this.stop_server(&id, cx);
911 }
912 })?;
913 return Ok(());
914 }
915
916 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
917 (
918 this.context_server_settings.clone(),
919 this.registry.clone(),
920 this.worktree_store.clone(),
921 )
922 })?;
923
924 for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
925 configured_servers
926 .entry(id)
927 .or_insert(ContextServerSettings::default_extension());
928 }
929
930 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
931 configured_servers
932 .into_iter()
933 .partition(|(_, settings)| settings.enabled());
934
935 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
936 let id = ContextServerId(id);
937 ContextServerConfiguration::from_settings(
938 settings,
939 id.clone(),
940 registry.clone(),
941 worktree_store.clone(),
942 cx,
943 )
944 .map(move |config| (id, config))
945 }))
946 .await
947 .into_iter()
948 .filter_map(|(id, config)| config.map(|config| (id, config)))
949 .collect::<HashMap<_, _>>();
950
951 let mut servers_to_start = Vec::new();
952 let mut servers_to_remove = HashSet::default();
953 let mut servers_to_stop = HashSet::default();
954
955 this.update(cx, |this, _cx| {
956 for server_id in this.servers.keys() {
957 // All servers that are not in desired_servers should be removed from the store.
958 // This can happen if the user removed a server from the context server settings.
959 if !configured_servers.contains_key(server_id) {
960 if disabled_servers.contains_key(&server_id.0) {
961 servers_to_stop.insert(server_id.clone());
962 } else {
963 servers_to_remove.insert(server_id.clone());
964 }
965 }
966 }
967
968 for (id, config) in configured_servers {
969 let state = this.servers.get(&id);
970 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
971 let existing_config = state.as_ref().map(|state| state.configuration());
972 if existing_config.as_deref() != Some(&config) || is_stopped {
973 let config = Arc::new(config);
974 servers_to_start.push((id.clone(), config));
975 if this.servers.contains_key(&id) {
976 servers_to_stop.insert(id);
977 }
978 }
979 }
980
981 anyhow::Ok(())
982 })??;
983
984 this.update(cx, |this, inner_cx| {
985 for id in servers_to_stop {
986 this.stop_server(&id, inner_cx)?;
987 }
988 for id in servers_to_remove {
989 this.remove_server(&id, inner_cx)?;
990 }
991 anyhow::Ok(())
992 })??;
993
994 for (id, config) in servers_to_start {
995 match Self::create_context_server(this.clone(), id.clone(), config, cx).await {
996 Ok((server, config)) => {
997 this.update(cx, |this, cx| {
998 this.run_server(server, config, cx);
999 })?;
1000 }
1001 Err(err) => {
1002 log::error!("{id} context server failed to create: {err:#}");
1003 this.update(cx, |_this, cx| {
1004 cx.emit(ServerStatusChangedEvent {
1005 server_id: id,
1006 status: ContextServerStatus::Error(err.to_string().into()),
1007 });
1008 cx.notify();
1009 })?;
1010 }
1011 }
1012 }
1013
1014 Ok(())
1015 }
1016}