1pub mod extension;
2pub mod registry;
3
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context as _, Result};
9use collections::{HashMap, HashSet};
10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
11use futures::{FutureExt as _, future::join_all};
12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
13use itertools::Itertools;
14use registry::ContextServerDescriptorRegistry;
15use remote::RemoteClient;
16use rpc::{AnyProtoClient, TypedEnvelope, proto};
17use settings::{Settings as _, SettingsStore};
18use util::{ResultExt as _, rel_path::RelPath};
19
20use crate::{
21 DisableAiSettings, Project,
22 project_settings::{ContextServerSettings, ProjectSettings},
23 worktree_store::WorktreeStore,
24};
25
26/// Maximum timeout for context server requests
27/// Prevents extremely large timeout values from tying up resources indefinitely.
28const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
29
30pub fn init(cx: &mut App) {
31 extension::init(cx);
32}
33
34actions!(
35 context_server,
36 [
37 /// Restarts the context server.
38 Restart
39 ]
40);
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum ContextServerStatus {
44 Starting,
45 Running,
46 Stopped,
47 Error(Arc<str>),
48}
49
50impl ContextServerStatus {
51 fn from_state(state: &ContextServerState) -> Self {
52 match state {
53 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
54 ContextServerState::Running { .. } => ContextServerStatus::Running,
55 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
56 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
57 }
58 }
59}
60
61enum ContextServerState {
62 Starting {
63 server: Arc<ContextServer>,
64 configuration: Arc<ContextServerConfiguration>,
65 _task: Task<()>,
66 },
67 Running {
68 server: Arc<ContextServer>,
69 configuration: Arc<ContextServerConfiguration>,
70 },
71 Stopped {
72 server: Arc<ContextServer>,
73 configuration: Arc<ContextServerConfiguration>,
74 },
75 Error {
76 server: Arc<ContextServer>,
77 configuration: Arc<ContextServerConfiguration>,
78 error: Arc<str>,
79 },
80}
81
82impl ContextServerState {
83 pub fn server(&self) -> Arc<ContextServer> {
84 match self {
85 ContextServerState::Starting { server, .. } => server.clone(),
86 ContextServerState::Running { server, .. } => server.clone(),
87 ContextServerState::Stopped { server, .. } => server.clone(),
88 ContextServerState::Error { server, .. } => server.clone(),
89 }
90 }
91
92 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
93 match self {
94 ContextServerState::Starting { configuration, .. } => configuration.clone(),
95 ContextServerState::Running { configuration, .. } => configuration.clone(),
96 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
97 ContextServerState::Error { configuration, .. } => configuration.clone(),
98 }
99 }
100}
101
102#[derive(Debug, PartialEq, Eq)]
103pub enum ContextServerConfiguration {
104 Custom {
105 command: ContextServerCommand,
106 remote: bool,
107 },
108 Extension {
109 command: ContextServerCommand,
110 settings: serde_json::Value,
111 remote: bool,
112 },
113 Http {
114 url: url::Url,
115 headers: HashMap<String, String>,
116 timeout: Option<u64>,
117 },
118}
119
120impl ContextServerConfiguration {
121 pub fn command(&self) -> Option<&ContextServerCommand> {
122 match self {
123 ContextServerConfiguration::Custom { command, .. } => Some(command),
124 ContextServerConfiguration::Extension { command, .. } => Some(command),
125 ContextServerConfiguration::Http { .. } => None,
126 }
127 }
128
129 pub fn remote(&self) -> bool {
130 match self {
131 ContextServerConfiguration::Custom { remote, .. } => *remote,
132 ContextServerConfiguration::Extension { remote, .. } => *remote,
133 ContextServerConfiguration::Http { .. } => false,
134 }
135 }
136
137 pub async fn from_settings(
138 settings: ContextServerSettings,
139 id: ContextServerId,
140 registry: Entity<ContextServerDescriptorRegistry>,
141 worktree_store: Entity<WorktreeStore>,
142 cx: &AsyncApp,
143 ) -> Option<Self> {
144 match settings {
145 ContextServerSettings::Stdio {
146 enabled: _,
147 command,
148 remote,
149 } => Some(ContextServerConfiguration::Custom { command, remote }),
150 ContextServerSettings::Extension {
151 enabled: _,
152 settings,
153 remote,
154 } => {
155 let descriptor =
156 cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
157
158 match descriptor.command(worktree_store, cx).await {
159 Ok(command) => Some(ContextServerConfiguration::Extension {
160 command,
161 settings,
162 remote,
163 }),
164 Err(e) => {
165 log::error!(
166 "Failed to create context server configuration from settings: {e:#}"
167 );
168 None
169 }
170 }
171 }
172 ContextServerSettings::Http {
173 enabled: _,
174 url,
175 headers: auth,
176 timeout,
177 } => {
178 let url = url::Url::parse(&url).log_err()?;
179 Some(ContextServerConfiguration::Http {
180 url,
181 headers: auth,
182 timeout,
183 })
184 }
185 }
186 }
187}
188
189pub type ContextServerFactory =
190 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
191
192enum ContextServerStoreState {
193 Local {
194 downstream_client: Option<(u64, AnyProtoClient)>,
195 is_headless: bool,
196 },
197 Remote {
198 project_id: u64,
199 upstream_client: Entity<RemoteClient>,
200 },
201}
202
203pub struct ContextServerStore {
204 state: ContextServerStoreState,
205 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
206 servers: HashMap<ContextServerId, ContextServerState>,
207 server_ids: Vec<ContextServerId>,
208 worktree_store: Entity<WorktreeStore>,
209 project: Option<WeakEntity<Project>>,
210 registry: Entity<ContextServerDescriptorRegistry>,
211 update_servers_task: Option<Task<Result<()>>>,
212 context_server_factory: Option<ContextServerFactory>,
213 needs_server_update: bool,
214 _subscriptions: Vec<Subscription>,
215}
216
217pub struct ServerStatusChangedEvent {
218 pub server_id: ContextServerId,
219 pub status: ContextServerStatus,
220}
221
222impl EventEmitter<ServerStatusChangedEvent> for ContextServerStore {}
223
224impl ContextServerStore {
225 pub fn local(
226 worktree_store: Entity<WorktreeStore>,
227 weak_project: Option<WeakEntity<Project>>,
228 headless: bool,
229 cx: &mut Context<Self>,
230 ) -> Self {
231 Self::new_internal(
232 !headless,
233 None,
234 ContextServerDescriptorRegistry::default_global(cx),
235 worktree_store,
236 weak_project,
237 ContextServerStoreState::Local {
238 downstream_client: None,
239 is_headless: headless,
240 },
241 cx,
242 )
243 }
244
245 pub fn remote(
246 project_id: u64,
247 upstream_client: Entity<RemoteClient>,
248 worktree_store: Entity<WorktreeStore>,
249 weak_project: Option<WeakEntity<Project>>,
250 cx: &mut Context<Self>,
251 ) -> Self {
252 Self::new_internal(
253 true,
254 None,
255 ContextServerDescriptorRegistry::default_global(cx),
256 worktree_store,
257 weak_project,
258 ContextServerStoreState::Remote {
259 project_id,
260 upstream_client,
261 },
262 cx,
263 )
264 }
265
266 pub fn init_headless(session: &AnyProtoClient) {
267 session.add_entity_request_handler(Self::handle_get_context_server_command);
268 }
269
270 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
271 if let ContextServerStoreState::Local {
272 downstream_client, ..
273 } = &mut self.state
274 {
275 *downstream_client = Some((project_id, client));
276 }
277 }
278
279 pub fn is_remote_project(&self) -> bool {
280 matches!(self.state, ContextServerStoreState::Remote { .. })
281 }
282
283 /// Returns all configured context server ids, excluding the ones that are disabled
284 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
285 self.context_server_settings
286 .iter()
287 .filter(|(_, settings)| settings.enabled())
288 .map(|(id, _)| ContextServerId(id.clone()))
289 .collect()
290 }
291
292 #[cfg(feature = "test-support")]
293 pub fn test(
294 registry: Entity<ContextServerDescriptorRegistry>,
295 worktree_store: Entity<WorktreeStore>,
296 weak_project: Option<WeakEntity<Project>>,
297 cx: &mut Context<Self>,
298 ) -> Self {
299 Self::new_internal(
300 false,
301 None,
302 registry,
303 worktree_store,
304 weak_project,
305 ContextServerStoreState::Local {
306 downstream_client: None,
307 is_headless: false,
308 },
309 cx,
310 )
311 }
312
313 #[cfg(feature = "test-support")]
314 pub fn test_maintain_server_loop(
315 context_server_factory: Option<ContextServerFactory>,
316 registry: Entity<ContextServerDescriptorRegistry>,
317 worktree_store: Entity<WorktreeStore>,
318 weak_project: Option<WeakEntity<Project>>,
319 cx: &mut Context<Self>,
320 ) -> Self {
321 Self::new_internal(
322 true,
323 context_server_factory,
324 registry,
325 worktree_store,
326 weak_project,
327 ContextServerStoreState::Local {
328 downstream_client: None,
329 is_headless: false,
330 },
331 cx,
332 )
333 }
334
335 #[cfg(feature = "test-support")]
336 pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
337 self.context_server_factory = Some(factory);
338 }
339
340 #[cfg(feature = "test-support")]
341 pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
342 &self.registry
343 }
344
345 #[cfg(feature = "test-support")]
346 pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
347 let configuration = Arc::new(ContextServerConfiguration::Custom {
348 command: ContextServerCommand {
349 path: "test".into(),
350 args: vec![],
351 env: None,
352 timeout: None,
353 },
354 remote: false,
355 });
356 self.run_server(server, configuration, cx);
357 }
358
359 fn new_internal(
360 maintain_server_loop: bool,
361 context_server_factory: Option<ContextServerFactory>,
362 registry: Entity<ContextServerDescriptorRegistry>,
363 worktree_store: Entity<WorktreeStore>,
364 weak_project: Option<WeakEntity<Project>>,
365 state: ContextServerStoreState,
366 cx: &mut Context<Self>,
367 ) -> Self {
368 let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
369 let settings =
370 &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
371 if &this.context_server_settings == settings {
372 return;
373 }
374 this.context_server_settings = settings.clone();
375 if maintain_server_loop {
376 this.available_context_servers_changed(cx);
377 }
378 })];
379
380 if maintain_server_loop {
381 subscriptions.push(cx.observe(®istry, |this, _registry, cx| {
382 this.available_context_servers_changed(cx);
383 }));
384 }
385
386 let mut this = Self {
387 state,
388 _subscriptions: subscriptions,
389 context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
390 .context_servers
391 .clone(),
392 worktree_store,
393 project: weak_project,
394 registry,
395 needs_server_update: false,
396 servers: HashMap::default(),
397 server_ids: Default::default(),
398 update_servers_task: None,
399 context_server_factory,
400 };
401 if maintain_server_loop {
402 this.available_context_servers_changed(cx);
403 }
404 this
405 }
406
407 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
408 self.servers.get(id).map(|state| state.server())
409 }
410
411 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
412 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
413 Some(server.clone())
414 } else {
415 None
416 }
417 }
418
419 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
420 self.servers.get(id).map(ContextServerStatus::from_state)
421 }
422
423 pub fn configuration_for_server(
424 &self,
425 id: &ContextServerId,
426 ) -> Option<Arc<ContextServerConfiguration>> {
427 self.servers.get(id).map(|state| state.configuration())
428 }
429
430 /// Returns a sorted slice of available unique context server IDs. Within the
431 /// slice, context servers which have `mcp-server-` as a prefix in their ID will
432 /// appear after servers that do not have this prefix in their ID.
433 pub fn server_ids(&self) -> &[ContextServerId] {
434 self.server_ids.as_slice()
435 }
436
437 fn populate_server_ids(&mut self, cx: &App) {
438 self.server_ids = self
439 .servers
440 .keys()
441 .cloned()
442 .chain(
443 self.registry
444 .read(cx)
445 .context_server_descriptors()
446 .into_iter()
447 .map(|(id, _)| ContextServerId(id)),
448 )
449 .chain(
450 self.context_server_settings
451 .keys()
452 .map(|id| ContextServerId(id.clone())),
453 )
454 .unique()
455 .sorted_unstable_by(
456 // Sort context servers: ones without mcp-server- prefix first, then prefixed ones
457 |a, b| {
458 const MCP_PREFIX: &str = "mcp-server-";
459 match (a.0.strip_prefix(MCP_PREFIX), b.0.strip_prefix(MCP_PREFIX)) {
460 // If one has mcp-server- prefix and other doesn't, non-mcp comes first
461 (Some(_), None) => std::cmp::Ordering::Greater,
462 (None, Some(_)) => std::cmp::Ordering::Less,
463 // If both have same prefix status, sort by appropriate key
464 (Some(a), Some(b)) => a.cmp(b),
465 (None, None) => a.0.cmp(&b.0),
466 }
467 },
468 )
469 .collect();
470 }
471
472 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
473 self.servers
474 .values()
475 .filter_map(|state| {
476 if let ContextServerState::Running { server, .. } = state {
477 Some(server.clone())
478 } else {
479 None
480 }
481 })
482 .collect()
483 }
484
485 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
486 cx.spawn(async move |this, cx| {
487 let this = this.upgrade().context("Context server store dropped")?;
488 let settings = this
489 .update(cx, |this, _| {
490 this.context_server_settings.get(&server.id().0).cloned()
491 })
492 .context("Failed to get context server settings")?;
493
494 if !settings.enabled() {
495 return anyhow::Ok(());
496 }
497
498 let (registry, worktree_store) = this.update(cx, |this, _| {
499 (this.registry.clone(), this.worktree_store.clone())
500 });
501 let configuration = ContextServerConfiguration::from_settings(
502 settings,
503 server.id(),
504 registry,
505 worktree_store,
506 cx,
507 )
508 .await
509 .context("Failed to create context server configuration")?;
510
511 this.update(cx, |this, cx| {
512 this.run_server(server, Arc::new(configuration), cx)
513 });
514 Ok(())
515 })
516 .detach_and_log_err(cx);
517 }
518
519 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
520 if matches!(
521 self.servers.get(id),
522 Some(ContextServerState::Stopped { .. })
523 ) {
524 return Ok(());
525 }
526
527 let state = self
528 .servers
529 .remove(id)
530 .context("Context server not found")?;
531
532 let server = state.server();
533 let configuration = state.configuration();
534 let mut result = Ok(());
535 if let ContextServerState::Running { server, .. } = &state {
536 result = server.stop();
537 }
538 drop(state);
539
540 self.update_server_state(
541 id.clone(),
542 ContextServerState::Stopped {
543 configuration,
544 server,
545 },
546 cx,
547 );
548
549 result
550 }
551
552 fn run_server(
553 &mut self,
554 server: Arc<ContextServer>,
555 configuration: Arc<ContextServerConfiguration>,
556 cx: &mut Context<Self>,
557 ) {
558 let id = server.id();
559 if matches!(
560 self.servers.get(&id),
561 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
562 ) {
563 self.stop_server(&id, cx).log_err();
564 }
565 let task = cx.spawn({
566 let id = server.id();
567 let server = server.clone();
568 let configuration = configuration.clone();
569
570 async move |this, cx| {
571 match server.clone().start(cx).await {
572 Ok(_) => {
573 debug_assert!(server.client().is_some());
574
575 this.update(cx, |this, cx| {
576 this.update_server_state(
577 id.clone(),
578 ContextServerState::Running {
579 server,
580 configuration,
581 },
582 cx,
583 )
584 })
585 .log_err()
586 }
587 Err(err) => {
588 log::error!("{} context server failed to start: {}", id, err);
589 this.update(cx, |this, cx| {
590 this.update_server_state(
591 id.clone(),
592 ContextServerState::Error {
593 configuration,
594 server,
595 error: err.to_string().into(),
596 },
597 cx,
598 )
599 })
600 .log_err()
601 }
602 };
603 }
604 });
605
606 self.update_server_state(
607 id.clone(),
608 ContextServerState::Starting {
609 configuration,
610 _task: task,
611 server,
612 },
613 cx,
614 );
615 }
616
617 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
618 let state = self
619 .servers
620 .remove(id)
621 .context("Context server not found")?;
622 drop(state);
623 cx.emit(ServerStatusChangedEvent {
624 server_id: id.clone(),
625 status: ContextServerStatus::Stopped,
626 });
627 Ok(())
628 }
629
630 pub async fn create_context_server(
631 this: WeakEntity<Self>,
632 id: ContextServerId,
633 configuration: Arc<ContextServerConfiguration>,
634 cx: &mut AsyncApp,
635 ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
636 let remote = configuration.remote();
637 let needs_remote_command = match configuration.as_ref() {
638 ContextServerConfiguration::Custom { .. }
639 | ContextServerConfiguration::Extension { .. } => remote,
640 ContextServerConfiguration::Http { .. } => false,
641 };
642
643 let (remote_state, is_remote_project) = this.update(cx, |this, _| {
644 let remote_state = match &this.state {
645 ContextServerStoreState::Remote {
646 project_id,
647 upstream_client,
648 } if needs_remote_command => Some((*project_id, upstream_client.clone())),
649 _ => None,
650 };
651 (remote_state, this.is_remote_project())
652 })?;
653
654 let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
655 this.project
656 .as_ref()
657 .and_then(|project| {
658 project
659 .read_with(cx, |project, cx| project.active_project_directory(cx))
660 .ok()
661 .flatten()
662 })
663 .or_else(|| {
664 this.worktree_store.read_with(cx, |store, cx| {
665 store.visible_worktrees(cx).fold(None, |acc, item| {
666 if acc.is_none() {
667 item.read(cx).root_dir()
668 } else {
669 acc
670 }
671 })
672 })
673 })
674 })?;
675
676 let configuration = if let Some((project_id, upstream_client)) = remote_state {
677 let root_dir = root_path.as_ref().map(|p| p.display().to_string());
678
679 let response = upstream_client
680 .update(cx, |client, _| {
681 client
682 .proto_client()
683 .request(proto::GetContextServerCommand {
684 project_id,
685 server_id: id.0.to_string(),
686 root_dir: root_dir.clone(),
687 })
688 })
689 .await?;
690
691 let remote_command = upstream_client.update(cx, |client, _| {
692 client.build_command(
693 Some(response.path),
694 &response.args,
695 &response.env.into_iter().collect(),
696 root_dir,
697 None,
698 )
699 })?;
700
701 let command = ContextServerCommand {
702 path: remote_command.program.into(),
703 args: remote_command.args,
704 env: Some(remote_command.env.into_iter().collect()),
705 timeout: None,
706 };
707
708 Arc::new(ContextServerConfiguration::Custom { command, remote })
709 } else {
710 configuration
711 };
712
713 let server: Arc<ContextServer> = this.update(cx, |this, cx| {
714 let global_timeout =
715 Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
716
717 if let Some(factory) = this.context_server_factory.as_ref() {
718 return anyhow::Ok(factory(id.clone(), configuration.clone()));
719 }
720
721 match configuration.as_ref() {
722 ContextServerConfiguration::Http {
723 url,
724 headers,
725 timeout,
726 } => anyhow::Ok(Arc::new(ContextServer::http(
727 id,
728 url,
729 headers.clone(),
730 cx.http_client(),
731 cx.background_executor().clone(),
732 Some(Duration::from_secs(
733 timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
734 )),
735 )?)),
736 _ => {
737 let mut command = configuration
738 .command()
739 .context("Missing command configuration for stdio context server")?
740 .clone();
741 command.timeout = Some(
742 command
743 .timeout
744 .unwrap_or(global_timeout)
745 .min(MAX_TIMEOUT_SECS),
746 );
747
748 // Don't pass remote paths as working directory for locally-spawned processes
749 let working_directory = if is_remote_project { None } else { root_path };
750 anyhow::Ok(Arc::new(ContextServer::stdio(
751 id,
752 command,
753 working_directory,
754 )))
755 }
756 }
757 })??;
758
759 Ok((server, configuration))
760 }
761
762 async fn handle_get_context_server_command(
763 this: Entity<Self>,
764 envelope: TypedEnvelope<proto::GetContextServerCommand>,
765 mut cx: AsyncApp,
766 ) -> Result<proto::ContextServerCommand> {
767 let server_id = ContextServerId(envelope.payload.server_id.into());
768
769 let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
770 let ContextServerStoreState::Local {
771 is_headless: true, ..
772 } = &this.state
773 else {
774 anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
775 };
776
777 let settings = this
778 .context_server_settings
779 .get(&server_id.0)
780 .cloned()
781 .or_else(|| {
782 this.registry
783 .read(inner_cx)
784 .context_server_descriptor(&server_id.0)
785 .map(|_| ContextServerSettings::default_extension())
786 })
787 .with_context(|| format!("context server `{}` not found", server_id))?;
788
789 anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
790 })?;
791
792 let configuration = ContextServerConfiguration::from_settings(
793 settings,
794 server_id.clone(),
795 registry,
796 worktree_store,
797 &cx,
798 )
799 .await
800 .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
801
802 let command = configuration
803 .command()
804 .context("context server has no command (HTTP servers don't need RPC)")?;
805
806 Ok(proto::ContextServerCommand {
807 path: command.path.display().to_string(),
808 args: command.args.clone(),
809 env: command
810 .env
811 .clone()
812 .map(|env| env.into_iter().collect())
813 .unwrap_or_default(),
814 })
815 }
816
817 fn resolve_project_settings<'a>(
818 worktree_store: &'a Entity<WorktreeStore>,
819 cx: &'a App,
820 ) -> &'a ProjectSettings {
821 let location = worktree_store
822 .read(cx)
823 .visible_worktrees(cx)
824 .next()
825 .map(|worktree| settings::SettingsLocation {
826 worktree_id: worktree.read(cx).id(),
827 path: RelPath::empty(),
828 });
829 ProjectSettings::get(location, cx)
830 }
831
832 fn update_server_state(
833 &mut self,
834 id: ContextServerId,
835 state: ContextServerState,
836 cx: &mut Context<Self>,
837 ) {
838 let status = ContextServerStatus::from_state(&state);
839 self.servers.insert(id.clone(), state);
840 cx.emit(ServerStatusChangedEvent {
841 server_id: id,
842 status,
843 });
844 }
845
846 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
847 if self.update_servers_task.is_some() {
848 self.needs_server_update = true;
849 } else {
850 self.needs_server_update = false;
851 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
852 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
853 log::error!("Error maintaining context servers: {}", err);
854 }
855
856 this.update(cx, |this, cx| {
857 this.populate_server_ids(cx);
858 this.update_servers_task.take();
859 if this.needs_server_update {
860 this.available_context_servers_changed(cx);
861 }
862 })?;
863
864 Ok(())
865 }));
866 }
867 }
868
869 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
870 // Don't start context servers if AI is disabled
871 let ai_disabled = this.update(cx, |_, cx| DisableAiSettings::get_global(cx).disable_ai)?;
872 if ai_disabled {
873 // Stop all running servers when AI is disabled
874 this.update(cx, |this, cx| {
875 let server_ids: Vec<_> = this.servers.keys().cloned().collect();
876 for id in server_ids {
877 let _ = this.stop_server(&id, cx);
878 }
879 })?;
880 return Ok(());
881 }
882
883 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
884 (
885 this.context_server_settings.clone(),
886 this.registry.clone(),
887 this.worktree_store.clone(),
888 )
889 })?;
890
891 for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
892 configured_servers
893 .entry(id)
894 .or_insert(ContextServerSettings::default_extension());
895 }
896
897 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
898 configured_servers
899 .into_iter()
900 .partition(|(_, settings)| settings.enabled());
901
902 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
903 let id = ContextServerId(id);
904 ContextServerConfiguration::from_settings(
905 settings,
906 id.clone(),
907 registry.clone(),
908 worktree_store.clone(),
909 cx,
910 )
911 .map(move |config| (id, config))
912 }))
913 .await
914 .into_iter()
915 .filter_map(|(id, config)| config.map(|config| (id, config)))
916 .collect::<HashMap<_, _>>();
917
918 let mut servers_to_start = Vec::new();
919 let mut servers_to_remove = HashSet::default();
920 let mut servers_to_stop = HashSet::default();
921
922 this.update(cx, |this, _cx| {
923 for server_id in this.servers.keys() {
924 // All servers that are not in desired_servers should be removed from the store.
925 // This can happen if the user removed a server from the context server settings.
926 if !configured_servers.contains_key(server_id) {
927 if disabled_servers.contains_key(&server_id.0) {
928 servers_to_stop.insert(server_id.clone());
929 } else {
930 servers_to_remove.insert(server_id.clone());
931 }
932 }
933 }
934
935 for (id, config) in configured_servers {
936 let state = this.servers.get(&id);
937 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
938 let existing_config = state.as_ref().map(|state| state.configuration());
939 if existing_config.as_deref() != Some(&config) || is_stopped {
940 let config = Arc::new(config);
941 servers_to_start.push((id.clone(), config));
942 if this.servers.contains_key(&id) {
943 servers_to_stop.insert(id);
944 }
945 }
946 }
947
948 anyhow::Ok(())
949 })??;
950
951 this.update(cx, |this, inner_cx| {
952 for id in servers_to_stop {
953 this.stop_server(&id, inner_cx)?;
954 }
955 for id in servers_to_remove {
956 this.remove_server(&id, inner_cx)?;
957 }
958 anyhow::Ok(())
959 })??;
960
961 for (id, config) in servers_to_start {
962 let (server, config) =
963 Self::create_context_server(this.clone(), id, config, cx).await?;
964 this.update(cx, |this, cx| {
965 this.run_server(server, config, cx);
966 })?;
967 }
968
969 Ok(())
970 }
971}