1pub mod extension;
2pub mod registry;
3
4use std::{path::Path, sync::Arc};
5
6use anyhow::{Context as _, Result};
7use collections::{HashMap, HashSet};
8use context_server::{ContextServer, ContextServerId};
9use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
10use registry::ContextServerDescriptorRegistry;
11use settings::{Settings as _, SettingsStore};
12use util::ResultExt as _;
13
14use crate::{
15 project_settings::{ContextServerConfiguration, ProjectSettings},
16 worktree_store::WorktreeStore,
17};
18
19pub fn init(cx: &mut App) {
20 extension::init(cx);
21}
22
23actions!(context_server, [Restart]);
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum ContextServerStatus {
27 Starting,
28 Running,
29 Stopped,
30 Error(Arc<str>),
31}
32
33impl ContextServerStatus {
34 fn from_state(state: &ContextServerState) -> Self {
35 match state {
36 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
37 ContextServerState::Running { .. } => ContextServerStatus::Running,
38 ContextServerState::Stopped { error, .. } => {
39 if let Some(error) = error {
40 ContextServerStatus::Error(error.clone())
41 } else {
42 ContextServerStatus::Stopped
43 }
44 }
45 }
46 }
47}
48
49enum ContextServerState {
50 Starting {
51 server: Arc<ContextServer>,
52 configuration: Arc<ContextServerConfiguration>,
53 _task: Task<()>,
54 },
55 Running {
56 server: Arc<ContextServer>,
57 configuration: Arc<ContextServerConfiguration>,
58 },
59 Stopped {
60 server: Arc<ContextServer>,
61 configuration: Arc<ContextServerConfiguration>,
62 error: Option<Arc<str>>,
63 },
64}
65
66impl ContextServerState {
67 pub fn server(&self) -> Arc<ContextServer> {
68 match self {
69 ContextServerState::Starting { server, .. } => server.clone(),
70 ContextServerState::Running { server, .. } => server.clone(),
71 ContextServerState::Stopped { server, .. } => server.clone(),
72 }
73 }
74
75 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
76 match self {
77 ContextServerState::Starting { configuration, .. } => configuration.clone(),
78 ContextServerState::Running { configuration, .. } => configuration.clone(),
79 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
80 }
81 }
82}
83
84pub type ContextServerFactory =
85 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
86
87pub struct ContextServerStore {
88 servers: HashMap<ContextServerId, ContextServerState>,
89 worktree_store: Entity<WorktreeStore>,
90 registry: Entity<ContextServerDescriptorRegistry>,
91 update_servers_task: Option<Task<Result<()>>>,
92 context_server_factory: Option<ContextServerFactory>,
93 needs_server_update: bool,
94 _subscriptions: Vec<Subscription>,
95}
96
97pub enum Event {
98 ServerStatusChanged {
99 server_id: ContextServerId,
100 status: ContextServerStatus,
101 },
102}
103
104impl EventEmitter<Event> for ContextServerStore {}
105
106impl ContextServerStore {
107 pub fn new(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
108 Self::new_internal(
109 true,
110 None,
111 ContextServerDescriptorRegistry::default_global(cx),
112 worktree_store,
113 cx,
114 )
115 }
116
117 #[cfg(any(test, feature = "test-support"))]
118 pub fn test(
119 registry: Entity<ContextServerDescriptorRegistry>,
120 worktree_store: Entity<WorktreeStore>,
121 cx: &mut Context<Self>,
122 ) -> Self {
123 Self::new_internal(false, None, registry, worktree_store, cx)
124 }
125
126 #[cfg(any(test, feature = "test-support"))]
127 pub fn test_maintain_server_loop(
128 context_server_factory: ContextServerFactory,
129 registry: Entity<ContextServerDescriptorRegistry>,
130 worktree_store: Entity<WorktreeStore>,
131 cx: &mut Context<Self>,
132 ) -> Self {
133 Self::new_internal(
134 true,
135 Some(context_server_factory),
136 registry,
137 worktree_store,
138 cx,
139 )
140 }
141
142 fn new_internal(
143 maintain_server_loop: bool,
144 context_server_factory: Option<ContextServerFactory>,
145 registry: Entity<ContextServerDescriptorRegistry>,
146 worktree_store: Entity<WorktreeStore>,
147 cx: &mut Context<Self>,
148 ) -> Self {
149 let subscriptions = if maintain_server_loop {
150 vec![
151 cx.observe(®istry, |this, _registry, cx| {
152 this.available_context_servers_changed(cx);
153 }),
154 cx.observe_global::<SettingsStore>(|this, cx| {
155 this.available_context_servers_changed(cx);
156 }),
157 ]
158 } else {
159 Vec::new()
160 };
161
162 let mut this = Self {
163 _subscriptions: subscriptions,
164 worktree_store,
165 registry,
166 needs_server_update: false,
167 servers: HashMap::default(),
168 update_servers_task: None,
169 context_server_factory,
170 };
171 if maintain_server_loop {
172 this.available_context_servers_changed(cx);
173 }
174 this
175 }
176
177 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
178 self.servers.get(id).map(|state| state.server())
179 }
180
181 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
182 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
183 Some(server.clone())
184 } else {
185 None
186 }
187 }
188
189 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
190 self.servers.get(id).map(ContextServerStatus::from_state)
191 }
192
193 pub fn all_server_ids(&self) -> Vec<ContextServerId> {
194 self.servers.keys().cloned().collect()
195 }
196
197 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
198 self.servers
199 .values()
200 .filter_map(|state| {
201 if let ContextServerState::Running { server, .. } = state {
202 Some(server.clone())
203 } else {
204 None
205 }
206 })
207 .collect()
208 }
209
210 pub fn start_server(
211 &mut self,
212 server: Arc<ContextServer>,
213 cx: &mut Context<Self>,
214 ) -> Result<()> {
215 let location = self
216 .worktree_store
217 .read(cx)
218 .visible_worktrees(cx)
219 .next()
220 .map(|worktree| settings::SettingsLocation {
221 worktree_id: worktree.read(cx).id(),
222 path: Path::new(""),
223 });
224 let settings = ProjectSettings::get(location, cx);
225 let configuration = settings
226 .context_servers
227 .get(&server.id().0)
228 .context("Failed to load context server configuration from settings")?
229 .clone();
230
231 self.run_server(server, Arc::new(configuration), cx);
232 Ok(())
233 }
234
235 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
236 let state = self
237 .servers
238 .remove(id)
239 .context("Context server not found")?;
240
241 let server = state.server();
242 let configuration = state.configuration();
243 let mut result = Ok(());
244 if let ContextServerState::Running { server, .. } = &state {
245 result = server.stop();
246 }
247 drop(state);
248
249 self.update_server_state(
250 id.clone(),
251 ContextServerState::Stopped {
252 configuration,
253 server,
254 error: None,
255 },
256 cx,
257 );
258
259 result
260 }
261
262 pub fn restart_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
263 if let Some(state) = self.servers.get(&id) {
264 let configuration = state.configuration();
265
266 self.stop_server(&state.server().id(), cx)?;
267 let new_server = self.create_context_server(id.clone(), configuration.clone())?;
268 self.run_server(new_server, configuration, cx);
269 }
270 Ok(())
271 }
272
273 fn run_server(
274 &mut self,
275 server: Arc<ContextServer>,
276 configuration: Arc<ContextServerConfiguration>,
277 cx: &mut Context<Self>,
278 ) {
279 let id = server.id();
280 if matches!(
281 self.servers.get(&id),
282 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
283 ) {
284 self.stop_server(&id, cx).log_err();
285 }
286
287 let task = cx.spawn({
288 let id = server.id();
289 let server = server.clone();
290 let configuration = configuration.clone();
291 async move |this, cx| {
292 match server.clone().start(&cx).await {
293 Ok(_) => {
294 log::info!("Started {} context server", id);
295 debug_assert!(server.client().is_some());
296
297 this.update(cx, |this, cx| {
298 this.update_server_state(
299 id.clone(),
300 ContextServerState::Running {
301 server,
302 configuration,
303 },
304 cx,
305 )
306 })
307 .log_err()
308 }
309 Err(err) => {
310 log::error!("{} context server failed to start: {}", id, err);
311 this.update(cx, |this, cx| {
312 this.update_server_state(
313 id.clone(),
314 ContextServerState::Stopped {
315 configuration,
316 server,
317 error: Some(err.to_string().into()),
318 },
319 cx,
320 )
321 })
322 .log_err()
323 }
324 };
325 }
326 });
327
328 self.update_server_state(
329 id.clone(),
330 ContextServerState::Starting {
331 configuration,
332 _task: task,
333 server,
334 },
335 cx,
336 );
337 }
338
339 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
340 let state = self
341 .servers
342 .remove(id)
343 .context("Context server not found")?;
344 drop(state);
345 cx.emit(Event::ServerStatusChanged {
346 server_id: id.clone(),
347 status: ContextServerStatus::Stopped,
348 });
349 Ok(())
350 }
351
352 fn is_configuration_valid(&self, configuration: &ContextServerConfiguration) -> bool {
353 // Command must be some when we are running in stdio mode.
354 self.context_server_factory.as_ref().is_some() || configuration.command.is_some()
355 }
356
357 fn create_context_server(
358 &self,
359 id: ContextServerId,
360 configuration: Arc<ContextServerConfiguration>,
361 ) -> Result<Arc<ContextServer>> {
362 if let Some(factory) = self.context_server_factory.as_ref() {
363 Ok(factory(id, configuration))
364 } else {
365 let command = configuration
366 .command
367 .clone()
368 .context("Missing command to run context server")?;
369 Ok(Arc::new(ContextServer::stdio(id, command)))
370 }
371 }
372
373 fn update_server_state(
374 &mut self,
375 id: ContextServerId,
376 state: ContextServerState,
377 cx: &mut Context<Self>,
378 ) {
379 let status = ContextServerStatus::from_state(&state);
380 self.servers.insert(id.clone(), state);
381 cx.emit(Event::ServerStatusChanged {
382 server_id: id,
383 status,
384 });
385 }
386
387 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
388 if self.update_servers_task.is_some() {
389 self.needs_server_update = true;
390 } else {
391 self.needs_server_update = false;
392 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
393 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
394 log::error!("Error maintaining context servers: {}", err);
395 }
396
397 this.update(cx, |this, cx| {
398 this.update_servers_task.take();
399 if this.needs_server_update {
400 this.available_context_servers_changed(cx);
401 }
402 })?;
403
404 Ok(())
405 }));
406 }
407 }
408
409 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
410 let mut desired_servers = HashMap::default();
411
412 let (registry, worktree_store) = this.update(cx, |this, cx| {
413 let location = this
414 .worktree_store
415 .read(cx)
416 .visible_worktrees(cx)
417 .next()
418 .map(|worktree| settings::SettingsLocation {
419 worktree_id: worktree.read(cx).id(),
420 path: Path::new(""),
421 });
422 let settings = ProjectSettings::get(location, cx);
423 desired_servers = settings.context_servers.clone();
424
425 (this.registry.clone(), this.worktree_store.clone())
426 })?;
427
428 for (id, descriptor) in
429 registry.read_with(cx, |registry, _| registry.context_server_descriptors())?
430 {
431 let config = desired_servers.entry(id.clone()).or_default();
432 if config.command.is_none() {
433 if let Some(extension_command) = descriptor
434 .command(worktree_store.clone(), &cx)
435 .await
436 .log_err()
437 {
438 config.command = Some(extension_command);
439 }
440 }
441 }
442
443 this.update(cx, |this, _| {
444 // Filter out configurations without commands, the user uninstalled an extension.
445 desired_servers.retain(|_, configuration| this.is_configuration_valid(configuration));
446 })?;
447
448 let mut servers_to_start = Vec::new();
449 let mut servers_to_remove = HashSet::default();
450 let mut servers_to_stop = HashSet::default();
451
452 this.update(cx, |this, _cx| {
453 for server_id in this.servers.keys() {
454 // All servers that are not in desired_servers should be removed from the store.
455 // E.g. this can happen if the user removed a server from the configuration,
456 // or the user uninstalled an extension.
457 if !desired_servers.contains_key(&server_id.0) {
458 servers_to_remove.insert(server_id.clone());
459 }
460 }
461
462 for (id, config) in desired_servers {
463 let id = ContextServerId(id.clone());
464
465 let existing_config = this.servers.get(&id).map(|state| state.configuration());
466 if existing_config.as_deref() != Some(&config) {
467 let config = Arc::new(config);
468 if let Some(server) = this
469 .create_context_server(id.clone(), config.clone())
470 .log_err()
471 {
472 servers_to_start.push((server, config));
473 if this.servers.contains_key(&id) {
474 servers_to_stop.insert(id);
475 }
476 }
477 }
478 }
479 })?;
480
481 for id in servers_to_stop {
482 this.update(cx, |this, cx| this.stop_server(&id, cx).ok())?;
483 }
484
485 for id in servers_to_remove {
486 this.update(cx, |this, cx| this.remove_server(&id, cx).ok())?;
487 }
488
489 for (server, config) in servers_to_start {
490 this.update(cx, |this, cx| this.run_server(server, config, cx))
491 .log_err();
492 }
493
494 Ok(())
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use crate::{FakeFs, Project, project_settings::ProjectSettings};
502 use context_server::test::create_fake_transport;
503 use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
504 use serde_json::json;
505 use std::{cell::RefCell, rc::Rc};
506 use util::path;
507
508 #[gpui::test]
509 async fn test_context_server_status(cx: &mut TestAppContext) {
510 const SERVER_1_ID: &'static str = "mcp-1";
511 const SERVER_2_ID: &'static str = "mcp-2";
512
513 let (_fs, project) = setup_context_server_test(
514 cx,
515 json!({"code.rs": ""}),
516 vec![
517 (SERVER_1_ID.into(), ContextServerConfiguration::default()),
518 (SERVER_2_ID.into(), ContextServerConfiguration::default()),
519 ],
520 )
521 .await;
522
523 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
524 let store = cx.new(|cx| {
525 ContextServerStore::test(registry.clone(), project.read(cx).worktree_store(), cx)
526 });
527
528 let server_1_id = ContextServerId(SERVER_1_ID.into());
529 let server_2_id = ContextServerId(SERVER_2_ID.into());
530
531 let server_1 = Arc::new(ContextServer::new(
532 server_1_id.clone(),
533 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
534 ));
535 let server_2 = Arc::new(ContextServer::new(
536 server_2_id.clone(),
537 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
538 ));
539
540 store
541 .update(cx, |store, cx| store.start_server(server_1, cx))
542 .unwrap();
543
544 cx.run_until_parked();
545
546 cx.update(|cx| {
547 assert_eq!(
548 store.read(cx).status_for_server(&server_1_id),
549 Some(ContextServerStatus::Running)
550 );
551 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
552 });
553
554 store
555 .update(cx, |store, cx| store.start_server(server_2.clone(), cx))
556 .unwrap();
557
558 cx.run_until_parked();
559
560 cx.update(|cx| {
561 assert_eq!(
562 store.read(cx).status_for_server(&server_1_id),
563 Some(ContextServerStatus::Running)
564 );
565 assert_eq!(
566 store.read(cx).status_for_server(&server_2_id),
567 Some(ContextServerStatus::Running)
568 );
569 });
570
571 store
572 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
573 .unwrap();
574
575 cx.update(|cx| {
576 assert_eq!(
577 store.read(cx).status_for_server(&server_1_id),
578 Some(ContextServerStatus::Running)
579 );
580 assert_eq!(
581 store.read(cx).status_for_server(&server_2_id),
582 Some(ContextServerStatus::Stopped)
583 );
584 });
585 }
586
587 #[gpui::test]
588 async fn test_context_server_status_events(cx: &mut TestAppContext) {
589 const SERVER_1_ID: &'static str = "mcp-1";
590 const SERVER_2_ID: &'static str = "mcp-2";
591
592 let (_fs, project) = setup_context_server_test(
593 cx,
594 json!({"code.rs": ""}),
595 vec![
596 (SERVER_1_ID.into(), ContextServerConfiguration::default()),
597 (SERVER_2_ID.into(), ContextServerConfiguration::default()),
598 ],
599 )
600 .await;
601
602 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
603 let store = cx.new(|cx| {
604 ContextServerStore::test(registry.clone(), project.read(cx).worktree_store(), cx)
605 });
606
607 let server_1_id = ContextServerId(SERVER_1_ID.into());
608 let server_2_id = ContextServerId(SERVER_2_ID.into());
609
610 let server_1 = Arc::new(ContextServer::new(
611 server_1_id.clone(),
612 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
613 ));
614 let server_2 = Arc::new(ContextServer::new(
615 server_2_id.clone(),
616 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
617 ));
618
619 let _server_events = assert_server_events(
620 &store,
621 vec![
622 (server_1_id.clone(), ContextServerStatus::Starting),
623 (server_1_id.clone(), ContextServerStatus::Running),
624 (server_2_id.clone(), ContextServerStatus::Starting),
625 (server_2_id.clone(), ContextServerStatus::Running),
626 (server_2_id.clone(), ContextServerStatus::Stopped),
627 ],
628 cx,
629 );
630
631 store
632 .update(cx, |store, cx| store.start_server(server_1, cx))
633 .unwrap();
634
635 cx.run_until_parked();
636
637 store
638 .update(cx, |store, cx| store.start_server(server_2.clone(), cx))
639 .unwrap();
640
641 cx.run_until_parked();
642
643 store
644 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
645 .unwrap();
646 }
647
648 #[gpui::test(iterations = 25)]
649 async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
650 const SERVER_1_ID: &'static str = "mcp-1";
651
652 let (_fs, project) = setup_context_server_test(
653 cx,
654 json!({"code.rs": ""}),
655 vec![(SERVER_1_ID.into(), ContextServerConfiguration::default())],
656 )
657 .await;
658
659 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
660 let store = cx.new(|cx| {
661 ContextServerStore::test(registry.clone(), project.read(cx).worktree_store(), cx)
662 });
663
664 let server_id = ContextServerId(SERVER_1_ID.into());
665
666 let server_with_same_id_1 = Arc::new(ContextServer::new(
667 server_id.clone(),
668 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
669 ));
670 let server_with_same_id_2 = Arc::new(ContextServer::new(
671 server_id.clone(),
672 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
673 ));
674
675 // If we start another server with the same id, we should report that we stopped the previous one
676 let _server_events = assert_server_events(
677 &store,
678 vec![
679 (server_id.clone(), ContextServerStatus::Starting),
680 (server_id.clone(), ContextServerStatus::Stopped),
681 (server_id.clone(), ContextServerStatus::Starting),
682 (server_id.clone(), ContextServerStatus::Running),
683 ],
684 cx,
685 );
686
687 store
688 .update(cx, |store, cx| {
689 store.start_server(server_with_same_id_1.clone(), cx)
690 })
691 .unwrap();
692 store
693 .update(cx, |store, cx| {
694 store.start_server(server_with_same_id_2.clone(), cx)
695 })
696 .unwrap();
697 cx.update(|cx| {
698 assert_eq!(
699 store.read(cx).status_for_server(&server_id),
700 Some(ContextServerStatus::Starting)
701 );
702 });
703
704 cx.run_until_parked();
705
706 cx.update(|cx| {
707 assert_eq!(
708 store.read(cx).status_for_server(&server_id),
709 Some(ContextServerStatus::Running)
710 );
711 });
712 }
713
714 #[gpui::test]
715 async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
716 const SERVER_1_ID: &'static str = "mcp-1";
717 const SERVER_2_ID: &'static str = "mcp-2";
718
719 let server_1_id = ContextServerId(SERVER_1_ID.into());
720 let server_2_id = ContextServerId(SERVER_2_ID.into());
721
722 let (_fs, project) = setup_context_server_test(
723 cx,
724 json!({"code.rs": ""}),
725 vec![(
726 SERVER_1_ID.into(),
727 ContextServerConfiguration {
728 command: None,
729 settings: Some(json!({
730 "somevalue": true
731 })),
732 },
733 )],
734 )
735 .await;
736
737 let executor = cx.executor();
738 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
739 let store = cx.new(|cx| {
740 ContextServerStore::test_maintain_server_loop(
741 Box::new(move |id, _| {
742 Arc::new(ContextServer::new(
743 id.clone(),
744 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
745 ))
746 }),
747 registry.clone(),
748 project.read(cx).worktree_store(),
749 cx,
750 )
751 });
752
753 // Ensure that mcp-1 starts up
754 {
755 let _server_events = assert_server_events(
756 &store,
757 vec![
758 (server_1_id.clone(), ContextServerStatus::Starting),
759 (server_1_id.clone(), ContextServerStatus::Running),
760 ],
761 cx,
762 );
763 cx.run_until_parked();
764 }
765
766 // Ensure that mcp-1 is restarted when the configuration was changed
767 {
768 let _server_events = assert_server_events(
769 &store,
770 vec![
771 (server_1_id.clone(), ContextServerStatus::Stopped),
772 (server_1_id.clone(), ContextServerStatus::Starting),
773 (server_1_id.clone(), ContextServerStatus::Running),
774 ],
775 cx,
776 );
777 set_context_server_configuration(
778 vec![(
779 server_1_id.0.clone(),
780 ContextServerConfiguration {
781 command: None,
782 settings: Some(json!({
783 "somevalue": false
784 })),
785 },
786 )],
787 cx,
788 );
789
790 cx.run_until_parked();
791 }
792
793 // Ensure that mcp-1 is not restarted when the configuration was not changed
794 {
795 let _server_events = assert_server_events(&store, vec![], cx);
796 set_context_server_configuration(
797 vec![(
798 server_1_id.0.clone(),
799 ContextServerConfiguration {
800 command: None,
801 settings: Some(json!({
802 "somevalue": false
803 })),
804 },
805 )],
806 cx,
807 );
808
809 cx.run_until_parked();
810 }
811
812 // Ensure that mcp-2 is started once it is added to the settings
813 {
814 let _server_events = assert_server_events(
815 &store,
816 vec![
817 (server_2_id.clone(), ContextServerStatus::Starting),
818 (server_2_id.clone(), ContextServerStatus::Running),
819 ],
820 cx,
821 );
822 set_context_server_configuration(
823 vec![
824 (
825 server_1_id.0.clone(),
826 ContextServerConfiguration {
827 command: None,
828 settings: Some(json!({
829 "somevalue": false
830 })),
831 },
832 ),
833 (
834 server_2_id.0.clone(),
835 ContextServerConfiguration {
836 command: None,
837 settings: Some(json!({
838 "somevalue": true
839 })),
840 },
841 ),
842 ],
843 cx,
844 );
845
846 cx.run_until_parked();
847 }
848
849 // Ensure that mcp-2 is removed once it is removed from the settings
850 {
851 let _server_events = assert_server_events(
852 &store,
853 vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
854 cx,
855 );
856 set_context_server_configuration(
857 vec![(
858 server_1_id.0.clone(),
859 ContextServerConfiguration {
860 command: None,
861 settings: Some(json!({
862 "somevalue": false
863 })),
864 },
865 )],
866 cx,
867 );
868
869 cx.run_until_parked();
870
871 cx.update(|cx| {
872 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
873 });
874 }
875 }
876
877 fn set_context_server_configuration(
878 context_servers: Vec<(Arc<str>, ContextServerConfiguration)>,
879 cx: &mut TestAppContext,
880 ) {
881 cx.update(|cx| {
882 SettingsStore::update_global(cx, |store, cx| {
883 let mut settings = ProjectSettings::default();
884 for (id, config) in context_servers {
885 settings.context_servers.insert(id, config);
886 }
887 store
888 .set_user_settings(&serde_json::to_string(&settings).unwrap(), cx)
889 .unwrap();
890 })
891 });
892 }
893
894 struct ServerEvents {
895 received_event_count: Rc<RefCell<usize>>,
896 expected_event_count: usize,
897 _subscription: Subscription,
898 }
899
900 impl Drop for ServerEvents {
901 fn drop(&mut self) {
902 let actual_event_count = *self.received_event_count.borrow();
903 assert_eq!(
904 actual_event_count, self.expected_event_count,
905 "
906 Expected to receive {} context server store events, but received {} events",
907 self.expected_event_count, actual_event_count
908 );
909 }
910 }
911
912 fn assert_server_events(
913 store: &Entity<ContextServerStore>,
914 expected_events: Vec<(ContextServerId, ContextServerStatus)>,
915 cx: &mut TestAppContext,
916 ) -> ServerEvents {
917 cx.update(|cx| {
918 let mut ix = 0;
919 let received_event_count = Rc::new(RefCell::new(0));
920 let expected_event_count = expected_events.len();
921 let subscription = cx.subscribe(store, {
922 let received_event_count = received_event_count.clone();
923 move |_, event, _| match event {
924 Event::ServerStatusChanged {
925 server_id: actual_server_id,
926 status: actual_status,
927 } => {
928 let (expected_server_id, expected_status) = &expected_events[ix];
929
930 assert_eq!(
931 actual_server_id, expected_server_id,
932 "Expected different server id at index {}",
933 ix
934 );
935 assert_eq!(
936 actual_status, expected_status,
937 "Expected different status at index {}",
938 ix
939 );
940 ix += 1;
941 *received_event_count.borrow_mut() += 1;
942 }
943 }
944 });
945 ServerEvents {
946 expected_event_count,
947 received_event_count,
948 _subscription: subscription,
949 }
950 })
951 }
952
953 async fn setup_context_server_test(
954 cx: &mut TestAppContext,
955 files: serde_json::Value,
956 context_server_configurations: Vec<(Arc<str>, ContextServerConfiguration)>,
957 ) -> (Arc<FakeFs>, Entity<Project>) {
958 cx.update(|cx| {
959 let settings_store = SettingsStore::test(cx);
960 cx.set_global(settings_store);
961 Project::init_settings(cx);
962 let mut settings = ProjectSettings::get_global(cx).clone();
963 for (id, config) in context_server_configurations {
964 settings.context_servers.insert(id, config);
965 }
966 ProjectSettings::override_global(settings, cx);
967 });
968
969 let fs = FakeFs::new(cx.executor());
970 fs.insert_tree(path!("/test"), files).await;
971 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
972
973 (fs, project)
974 }
975}