context_server_store.rs

  1pub mod extension;
  2pub mod registry;
  3
  4use std::{path::Path, sync::Arc};
  5
  6use anyhow::{Context as _, Result};
  7use collections::{HashMap, HashSet};
  8use context_server::{ContextServer, ContextServerId};
  9use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
 10use registry::ContextServerDescriptorRegistry;
 11use settings::{Settings as _, SettingsStore};
 12use util::ResultExt as _;
 13
 14use crate::{
 15    project_settings::{ContextServerConfiguration, ProjectSettings},
 16    worktree_store::WorktreeStore,
 17};
 18
 19pub fn init(cx: &mut App) {
 20    extension::init(cx);
 21}
 22
 23actions!(context_server, [Restart]);
 24
 25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 26pub enum ContextServerStatus {
 27    Starting,
 28    Running,
 29    Stopped,
 30    Error(Arc<str>),
 31}
 32
 33impl ContextServerStatus {
 34    fn from_state(state: &ContextServerState) -> Self {
 35        match state {
 36            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
 37            ContextServerState::Running { .. } => ContextServerStatus::Running,
 38            ContextServerState::Stopped { error, .. } => {
 39                if let Some(error) = error {
 40                    ContextServerStatus::Error(error.clone())
 41                } else {
 42                    ContextServerStatus::Stopped
 43                }
 44            }
 45        }
 46    }
 47}
 48
 49enum ContextServerState {
 50    Starting {
 51        server: Arc<ContextServer>,
 52        configuration: Arc<ContextServerConfiguration>,
 53        _task: Task<()>,
 54    },
 55    Running {
 56        server: Arc<ContextServer>,
 57        configuration: Arc<ContextServerConfiguration>,
 58    },
 59    Stopped {
 60        server: Arc<ContextServer>,
 61        configuration: Arc<ContextServerConfiguration>,
 62        error: Option<Arc<str>>,
 63    },
 64}
 65
 66impl ContextServerState {
 67    pub fn server(&self) -> Arc<ContextServer> {
 68        match self {
 69            ContextServerState::Starting { server, .. } => server.clone(),
 70            ContextServerState::Running { server, .. } => server.clone(),
 71            ContextServerState::Stopped { server, .. } => server.clone(),
 72        }
 73    }
 74
 75    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
 76        match self {
 77            ContextServerState::Starting { configuration, .. } => configuration.clone(),
 78            ContextServerState::Running { configuration, .. } => configuration.clone(),
 79            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
 80        }
 81    }
 82}
 83
 84pub type ContextServerFactory =
 85    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
 86
 87pub struct ContextServerStore {
 88    servers: HashMap<ContextServerId, ContextServerState>,
 89    worktree_store: Entity<WorktreeStore>,
 90    registry: Entity<ContextServerDescriptorRegistry>,
 91    update_servers_task: Option<Task<Result<()>>>,
 92    context_server_factory: Option<ContextServerFactory>,
 93    needs_server_update: bool,
 94    _subscriptions: Vec<Subscription>,
 95}
 96
 97pub enum Event {
 98    ServerStatusChanged {
 99        server_id: ContextServerId,
100        status: ContextServerStatus,
101    },
102}
103
104impl EventEmitter<Event> for ContextServerStore {}
105
106impl ContextServerStore {
107    pub fn new(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
108        Self::new_internal(
109            true,
110            None,
111            ContextServerDescriptorRegistry::default_global(cx),
112            worktree_store,
113            cx,
114        )
115    }
116
117    #[cfg(any(test, feature = "test-support"))]
118    pub fn test(
119        registry: Entity<ContextServerDescriptorRegistry>,
120        worktree_store: Entity<WorktreeStore>,
121        cx: &mut Context<Self>,
122    ) -> Self {
123        Self::new_internal(false, None, registry, worktree_store, cx)
124    }
125
126    #[cfg(any(test, feature = "test-support"))]
127    pub fn test_maintain_server_loop(
128        context_server_factory: ContextServerFactory,
129        registry: Entity<ContextServerDescriptorRegistry>,
130        worktree_store: Entity<WorktreeStore>,
131        cx: &mut Context<Self>,
132    ) -> Self {
133        Self::new_internal(
134            true,
135            Some(context_server_factory),
136            registry,
137            worktree_store,
138            cx,
139        )
140    }
141
142    fn new_internal(
143        maintain_server_loop: bool,
144        context_server_factory: Option<ContextServerFactory>,
145        registry: Entity<ContextServerDescriptorRegistry>,
146        worktree_store: Entity<WorktreeStore>,
147        cx: &mut Context<Self>,
148    ) -> Self {
149        let subscriptions = if maintain_server_loop {
150            vec![
151                cx.observe(&registry, |this, _registry, cx| {
152                    this.available_context_servers_changed(cx);
153                }),
154                cx.observe_global::<SettingsStore>(|this, cx| {
155                    this.available_context_servers_changed(cx);
156                }),
157            ]
158        } else {
159            Vec::new()
160        };
161
162        let mut this = Self {
163            _subscriptions: subscriptions,
164            worktree_store,
165            registry,
166            needs_server_update: false,
167            servers: HashMap::default(),
168            update_servers_task: None,
169            context_server_factory,
170        };
171        if maintain_server_loop {
172            this.available_context_servers_changed(cx);
173        }
174        this
175    }
176
177    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
178        self.servers.get(id).map(|state| state.server())
179    }
180
181    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
182        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
183            Some(server.clone())
184        } else {
185            None
186        }
187    }
188
189    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
190        self.servers.get(id).map(ContextServerStatus::from_state)
191    }
192
193    pub fn all_server_ids(&self) -> Vec<ContextServerId> {
194        self.servers.keys().cloned().collect()
195    }
196
197    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
198        self.servers
199            .values()
200            .filter_map(|state| {
201                if let ContextServerState::Running { server, .. } = state {
202                    Some(server.clone())
203                } else {
204                    None
205                }
206            })
207            .collect()
208    }
209
210    pub fn start_server(
211        &mut self,
212        server: Arc<ContextServer>,
213        cx: &mut Context<Self>,
214    ) -> Result<()> {
215        let location = self
216            .worktree_store
217            .read(cx)
218            .visible_worktrees(cx)
219            .next()
220            .map(|worktree| settings::SettingsLocation {
221                worktree_id: worktree.read(cx).id(),
222                path: Path::new(""),
223            });
224        let settings = ProjectSettings::get(location, cx);
225        let configuration = settings
226            .context_servers
227            .get(&server.id().0)
228            .context("Failed to load context server configuration from settings")?
229            .clone();
230
231        self.run_server(server, Arc::new(configuration), cx);
232        Ok(())
233    }
234
235    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
236        let state = self
237            .servers
238            .remove(id)
239            .context("Context server not found")?;
240
241        let server = state.server();
242        let configuration = state.configuration();
243        let mut result = Ok(());
244        if let ContextServerState::Running { server, .. } = &state {
245            result = server.stop();
246        }
247        drop(state);
248
249        self.update_server_state(
250            id.clone(),
251            ContextServerState::Stopped {
252                configuration,
253                server,
254                error: None,
255            },
256            cx,
257        );
258
259        result
260    }
261
262    pub fn restart_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
263        if let Some(state) = self.servers.get(&id) {
264            let configuration = state.configuration();
265
266            self.stop_server(&state.server().id(), cx)?;
267            let new_server = self.create_context_server(id.clone(), configuration.clone())?;
268            self.run_server(new_server, configuration, cx);
269        }
270        Ok(())
271    }
272
273    fn run_server(
274        &mut self,
275        server: Arc<ContextServer>,
276        configuration: Arc<ContextServerConfiguration>,
277        cx: &mut Context<Self>,
278    ) {
279        let id = server.id();
280        if matches!(
281            self.servers.get(&id),
282            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
283        ) {
284            self.stop_server(&id, cx).log_err();
285        }
286
287        let task = cx.spawn({
288            let id = server.id();
289            let server = server.clone();
290            let configuration = configuration.clone();
291            async move |this, cx| {
292                match server.clone().start(&cx).await {
293                    Ok(_) => {
294                        log::info!("Started {} context server", id);
295                        debug_assert!(server.client().is_some());
296
297                        this.update(cx, |this, cx| {
298                            this.update_server_state(
299                                id.clone(),
300                                ContextServerState::Running {
301                                    server,
302                                    configuration,
303                                },
304                                cx,
305                            )
306                        })
307                        .log_err()
308                    }
309                    Err(err) => {
310                        log::error!("{} context server failed to start: {}", id, err);
311                        this.update(cx, |this, cx| {
312                            this.update_server_state(
313                                id.clone(),
314                                ContextServerState::Stopped {
315                                    configuration,
316                                    server,
317                                    error: Some(err.to_string().into()),
318                                },
319                                cx,
320                            )
321                        })
322                        .log_err()
323                    }
324                };
325            }
326        });
327
328        self.update_server_state(
329            id.clone(),
330            ContextServerState::Starting {
331                configuration,
332                _task: task,
333                server,
334            },
335            cx,
336        );
337    }
338
339    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
340        let state = self
341            .servers
342            .remove(id)
343            .context("Context server not found")?;
344        drop(state);
345        cx.emit(Event::ServerStatusChanged {
346            server_id: id.clone(),
347            status: ContextServerStatus::Stopped,
348        });
349        Ok(())
350    }
351
352    fn is_configuration_valid(&self, configuration: &ContextServerConfiguration) -> bool {
353        // Command must be some when we are running in stdio mode.
354        self.context_server_factory.as_ref().is_some() || configuration.command.is_some()
355    }
356
357    fn create_context_server(
358        &self,
359        id: ContextServerId,
360        configuration: Arc<ContextServerConfiguration>,
361    ) -> Result<Arc<ContextServer>> {
362        if let Some(factory) = self.context_server_factory.as_ref() {
363            Ok(factory(id, configuration))
364        } else {
365            let command = configuration
366                .command
367                .clone()
368                .context("Missing command to run context server")?;
369            Ok(Arc::new(ContextServer::stdio(id, command)))
370        }
371    }
372
373    fn update_server_state(
374        &mut self,
375        id: ContextServerId,
376        state: ContextServerState,
377        cx: &mut Context<Self>,
378    ) {
379        let status = ContextServerStatus::from_state(&state);
380        self.servers.insert(id.clone(), state);
381        cx.emit(Event::ServerStatusChanged {
382            server_id: id,
383            status,
384        });
385    }
386
387    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
388        if self.update_servers_task.is_some() {
389            self.needs_server_update = true;
390        } else {
391            self.needs_server_update = false;
392            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
393                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
394                    log::error!("Error maintaining context servers: {}", err);
395                }
396
397                this.update(cx, |this, cx| {
398                    this.update_servers_task.take();
399                    if this.needs_server_update {
400                        this.available_context_servers_changed(cx);
401                    }
402                })?;
403
404                Ok(())
405            }));
406        }
407    }
408
409    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
410        let mut desired_servers = HashMap::default();
411
412        let (registry, worktree_store) = this.update(cx, |this, cx| {
413            let location = this
414                .worktree_store
415                .read(cx)
416                .visible_worktrees(cx)
417                .next()
418                .map(|worktree| settings::SettingsLocation {
419                    worktree_id: worktree.read(cx).id(),
420                    path: Path::new(""),
421                });
422            let settings = ProjectSettings::get(location, cx);
423            desired_servers = settings.context_servers.clone();
424
425            (this.registry.clone(), this.worktree_store.clone())
426        })?;
427
428        for (id, descriptor) in
429            registry.read_with(cx, |registry, _| registry.context_server_descriptors())?
430        {
431            let config = desired_servers.entry(id.clone()).or_default();
432            if config.command.is_none() {
433                if let Some(extension_command) = descriptor
434                    .command(worktree_store.clone(), &cx)
435                    .await
436                    .log_err()
437                {
438                    config.command = Some(extension_command);
439                }
440            }
441        }
442
443        this.update(cx, |this, _| {
444            // Filter out configurations without commands, the user uninstalled an extension.
445            desired_servers.retain(|_, configuration| this.is_configuration_valid(configuration));
446        })?;
447
448        let mut servers_to_start = Vec::new();
449        let mut servers_to_remove = HashSet::default();
450        let mut servers_to_stop = HashSet::default();
451
452        this.update(cx, |this, _cx| {
453            for server_id in this.servers.keys() {
454                // All servers that are not in desired_servers should be removed from the store.
455                // E.g. this can happen if the user removed a server from the configuration,
456                // or the user uninstalled an extension.
457                if !desired_servers.contains_key(&server_id.0) {
458                    servers_to_remove.insert(server_id.clone());
459                }
460            }
461
462            for (id, config) in desired_servers {
463                let id = ContextServerId(id.clone());
464
465                let existing_config = this.servers.get(&id).map(|state| state.configuration());
466                if existing_config.as_deref() != Some(&config) {
467                    let config = Arc::new(config);
468                    if let Some(server) = this
469                        .create_context_server(id.clone(), config.clone())
470                        .log_err()
471                    {
472                        servers_to_start.push((server, config));
473                        if this.servers.contains_key(&id) {
474                            servers_to_stop.insert(id);
475                        }
476                    }
477                }
478            }
479        })?;
480
481        for id in servers_to_stop {
482            this.update(cx, |this, cx| this.stop_server(&id, cx).ok())?;
483        }
484
485        for id in servers_to_remove {
486            this.update(cx, |this, cx| this.remove_server(&id, cx).ok())?;
487        }
488
489        for (server, config) in servers_to_start {
490            this.update(cx, |this, cx| this.run_server(server, config, cx))
491                .log_err();
492        }
493
494        Ok(())
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::{FakeFs, Project, project_settings::ProjectSettings};
502    use context_server::test::create_fake_transport;
503    use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
504    use serde_json::json;
505    use std::{cell::RefCell, rc::Rc};
506    use util::path;
507
508    #[gpui::test]
509    async fn test_context_server_status(cx: &mut TestAppContext) {
510        const SERVER_1_ID: &'static str = "mcp-1";
511        const SERVER_2_ID: &'static str = "mcp-2";
512
513        let (_fs, project) = setup_context_server_test(
514            cx,
515            json!({"code.rs": ""}),
516            vec![
517                (SERVER_1_ID.into(), ContextServerConfiguration::default()),
518                (SERVER_2_ID.into(), ContextServerConfiguration::default()),
519            ],
520        )
521        .await;
522
523        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
524        let store = cx.new(|cx| {
525            ContextServerStore::test(registry.clone(), project.read(cx).worktree_store(), cx)
526        });
527
528        let server_1_id = ContextServerId(SERVER_1_ID.into());
529        let server_2_id = ContextServerId(SERVER_2_ID.into());
530
531        let server_1 = Arc::new(ContextServer::new(
532            server_1_id.clone(),
533            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
534        ));
535        let server_2 = Arc::new(ContextServer::new(
536            server_2_id.clone(),
537            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
538        ));
539
540        store
541            .update(cx, |store, cx| store.start_server(server_1, cx))
542            .unwrap();
543
544        cx.run_until_parked();
545
546        cx.update(|cx| {
547            assert_eq!(
548                store.read(cx).status_for_server(&server_1_id),
549                Some(ContextServerStatus::Running)
550            );
551            assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
552        });
553
554        store
555            .update(cx, |store, cx| store.start_server(server_2.clone(), cx))
556            .unwrap();
557
558        cx.run_until_parked();
559
560        cx.update(|cx| {
561            assert_eq!(
562                store.read(cx).status_for_server(&server_1_id),
563                Some(ContextServerStatus::Running)
564            );
565            assert_eq!(
566                store.read(cx).status_for_server(&server_2_id),
567                Some(ContextServerStatus::Running)
568            );
569        });
570
571        store
572            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
573            .unwrap();
574
575        cx.update(|cx| {
576            assert_eq!(
577                store.read(cx).status_for_server(&server_1_id),
578                Some(ContextServerStatus::Running)
579            );
580            assert_eq!(
581                store.read(cx).status_for_server(&server_2_id),
582                Some(ContextServerStatus::Stopped)
583            );
584        });
585    }
586
587    #[gpui::test]
588    async fn test_context_server_status_events(cx: &mut TestAppContext) {
589        const SERVER_1_ID: &'static str = "mcp-1";
590        const SERVER_2_ID: &'static str = "mcp-2";
591
592        let (_fs, project) = setup_context_server_test(
593            cx,
594            json!({"code.rs": ""}),
595            vec![
596                (SERVER_1_ID.into(), ContextServerConfiguration::default()),
597                (SERVER_2_ID.into(), ContextServerConfiguration::default()),
598            ],
599        )
600        .await;
601
602        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
603        let store = cx.new(|cx| {
604            ContextServerStore::test(registry.clone(), project.read(cx).worktree_store(), cx)
605        });
606
607        let server_1_id = ContextServerId(SERVER_1_ID.into());
608        let server_2_id = ContextServerId(SERVER_2_ID.into());
609
610        let server_1 = Arc::new(ContextServer::new(
611            server_1_id.clone(),
612            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
613        ));
614        let server_2 = Arc::new(ContextServer::new(
615            server_2_id.clone(),
616            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
617        ));
618
619        let _server_events = assert_server_events(
620            &store,
621            vec![
622                (server_1_id.clone(), ContextServerStatus::Starting),
623                (server_1_id.clone(), ContextServerStatus::Running),
624                (server_2_id.clone(), ContextServerStatus::Starting),
625                (server_2_id.clone(), ContextServerStatus::Running),
626                (server_2_id.clone(), ContextServerStatus::Stopped),
627            ],
628            cx,
629        );
630
631        store
632            .update(cx, |store, cx| store.start_server(server_1, cx))
633            .unwrap();
634
635        cx.run_until_parked();
636
637        store
638            .update(cx, |store, cx| store.start_server(server_2.clone(), cx))
639            .unwrap();
640
641        cx.run_until_parked();
642
643        store
644            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
645            .unwrap();
646    }
647
648    #[gpui::test(iterations = 25)]
649    async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
650        const SERVER_1_ID: &'static str = "mcp-1";
651
652        let (_fs, project) = setup_context_server_test(
653            cx,
654            json!({"code.rs": ""}),
655            vec![(SERVER_1_ID.into(), ContextServerConfiguration::default())],
656        )
657        .await;
658
659        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
660        let store = cx.new(|cx| {
661            ContextServerStore::test(registry.clone(), project.read(cx).worktree_store(), cx)
662        });
663
664        let server_id = ContextServerId(SERVER_1_ID.into());
665
666        let server_with_same_id_1 = Arc::new(ContextServer::new(
667            server_id.clone(),
668            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
669        ));
670        let server_with_same_id_2 = Arc::new(ContextServer::new(
671            server_id.clone(),
672            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
673        ));
674
675        // If we start another server with the same id, we should report that we stopped the previous one
676        let _server_events = assert_server_events(
677            &store,
678            vec![
679                (server_id.clone(), ContextServerStatus::Starting),
680                (server_id.clone(), ContextServerStatus::Stopped),
681                (server_id.clone(), ContextServerStatus::Starting),
682                (server_id.clone(), ContextServerStatus::Running),
683            ],
684            cx,
685        );
686
687        store
688            .update(cx, |store, cx| {
689                store.start_server(server_with_same_id_1.clone(), cx)
690            })
691            .unwrap();
692        store
693            .update(cx, |store, cx| {
694                store.start_server(server_with_same_id_2.clone(), cx)
695            })
696            .unwrap();
697        cx.update(|cx| {
698            assert_eq!(
699                store.read(cx).status_for_server(&server_id),
700                Some(ContextServerStatus::Starting)
701            );
702        });
703
704        cx.run_until_parked();
705
706        cx.update(|cx| {
707            assert_eq!(
708                store.read(cx).status_for_server(&server_id),
709                Some(ContextServerStatus::Running)
710            );
711        });
712    }
713
714    #[gpui::test]
715    async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
716        const SERVER_1_ID: &'static str = "mcp-1";
717        const SERVER_2_ID: &'static str = "mcp-2";
718
719        let server_1_id = ContextServerId(SERVER_1_ID.into());
720        let server_2_id = ContextServerId(SERVER_2_ID.into());
721
722        let (_fs, project) = setup_context_server_test(
723            cx,
724            json!({"code.rs": ""}),
725            vec![(
726                SERVER_1_ID.into(),
727                ContextServerConfiguration {
728                    command: None,
729                    settings: Some(json!({
730                        "somevalue": true
731                    })),
732                },
733            )],
734        )
735        .await;
736
737        let executor = cx.executor();
738        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
739        let store = cx.new(|cx| {
740            ContextServerStore::test_maintain_server_loop(
741                Box::new(move |id, _| {
742                    Arc::new(ContextServer::new(
743                        id.clone(),
744                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
745                    ))
746                }),
747                registry.clone(),
748                project.read(cx).worktree_store(),
749                cx,
750            )
751        });
752
753        // Ensure that mcp-1 starts up
754        {
755            let _server_events = assert_server_events(
756                &store,
757                vec![
758                    (server_1_id.clone(), ContextServerStatus::Starting),
759                    (server_1_id.clone(), ContextServerStatus::Running),
760                ],
761                cx,
762            );
763            cx.run_until_parked();
764        }
765
766        // Ensure that mcp-1 is restarted when the configuration was changed
767        {
768            let _server_events = assert_server_events(
769                &store,
770                vec![
771                    (server_1_id.clone(), ContextServerStatus::Stopped),
772                    (server_1_id.clone(), ContextServerStatus::Starting),
773                    (server_1_id.clone(), ContextServerStatus::Running),
774                ],
775                cx,
776            );
777            set_context_server_configuration(
778                vec![(
779                    server_1_id.0.clone(),
780                    ContextServerConfiguration {
781                        command: None,
782                        settings: Some(json!({
783                            "somevalue": false
784                        })),
785                    },
786                )],
787                cx,
788            );
789
790            cx.run_until_parked();
791        }
792
793        // Ensure that mcp-1 is not restarted when the configuration was not changed
794        {
795            let _server_events = assert_server_events(&store, vec![], cx);
796            set_context_server_configuration(
797                vec![(
798                    server_1_id.0.clone(),
799                    ContextServerConfiguration {
800                        command: None,
801                        settings: Some(json!({
802                            "somevalue": false
803                        })),
804                    },
805                )],
806                cx,
807            );
808
809            cx.run_until_parked();
810        }
811
812        // Ensure that mcp-2 is started once it is added to the settings
813        {
814            let _server_events = assert_server_events(
815                &store,
816                vec![
817                    (server_2_id.clone(), ContextServerStatus::Starting),
818                    (server_2_id.clone(), ContextServerStatus::Running),
819                ],
820                cx,
821            );
822            set_context_server_configuration(
823                vec![
824                    (
825                        server_1_id.0.clone(),
826                        ContextServerConfiguration {
827                            command: None,
828                            settings: Some(json!({
829                                "somevalue": false
830                            })),
831                        },
832                    ),
833                    (
834                        server_2_id.0.clone(),
835                        ContextServerConfiguration {
836                            command: None,
837                            settings: Some(json!({
838                                "somevalue": true
839                            })),
840                        },
841                    ),
842                ],
843                cx,
844            );
845
846            cx.run_until_parked();
847        }
848
849        // Ensure that mcp-2 is removed once it is removed from the settings
850        {
851            let _server_events = assert_server_events(
852                &store,
853                vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
854                cx,
855            );
856            set_context_server_configuration(
857                vec![(
858                    server_1_id.0.clone(),
859                    ContextServerConfiguration {
860                        command: None,
861                        settings: Some(json!({
862                            "somevalue": false
863                        })),
864                    },
865                )],
866                cx,
867            );
868
869            cx.run_until_parked();
870
871            cx.update(|cx| {
872                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
873            });
874        }
875    }
876
877    fn set_context_server_configuration(
878        context_servers: Vec<(Arc<str>, ContextServerConfiguration)>,
879        cx: &mut TestAppContext,
880    ) {
881        cx.update(|cx| {
882            SettingsStore::update_global(cx, |store, cx| {
883                let mut settings = ProjectSettings::default();
884                for (id, config) in context_servers {
885                    settings.context_servers.insert(id, config);
886                }
887                store
888                    .set_user_settings(&serde_json::to_string(&settings).unwrap(), cx)
889                    .unwrap();
890            })
891        });
892    }
893
894    struct ServerEvents {
895        received_event_count: Rc<RefCell<usize>>,
896        expected_event_count: usize,
897        _subscription: Subscription,
898    }
899
900    impl Drop for ServerEvents {
901        fn drop(&mut self) {
902            let actual_event_count = *self.received_event_count.borrow();
903            assert_eq!(
904                actual_event_count, self.expected_event_count,
905                "
906                Expected to receive {} context server store events, but received {} events",
907                self.expected_event_count, actual_event_count
908            );
909        }
910    }
911
912    fn assert_server_events(
913        store: &Entity<ContextServerStore>,
914        expected_events: Vec<(ContextServerId, ContextServerStatus)>,
915        cx: &mut TestAppContext,
916    ) -> ServerEvents {
917        cx.update(|cx| {
918            let mut ix = 0;
919            let received_event_count = Rc::new(RefCell::new(0));
920            let expected_event_count = expected_events.len();
921            let subscription = cx.subscribe(store, {
922                let received_event_count = received_event_count.clone();
923                move |_, event, _| match event {
924                    Event::ServerStatusChanged {
925                        server_id: actual_server_id,
926                        status: actual_status,
927                    } => {
928                        let (expected_server_id, expected_status) = &expected_events[ix];
929
930                        assert_eq!(
931                            actual_server_id, expected_server_id,
932                            "Expected different server id at index {}",
933                            ix
934                        );
935                        assert_eq!(
936                            actual_status, expected_status,
937                            "Expected different status at index {}",
938                            ix
939                        );
940                        ix += 1;
941                        *received_event_count.borrow_mut() += 1;
942                    }
943                }
944            });
945            ServerEvents {
946                expected_event_count,
947                received_event_count,
948                _subscription: subscription,
949            }
950        })
951    }
952
953    async fn setup_context_server_test(
954        cx: &mut TestAppContext,
955        files: serde_json::Value,
956        context_server_configurations: Vec<(Arc<str>, ContextServerConfiguration)>,
957    ) -> (Arc<FakeFs>, Entity<Project>) {
958        cx.update(|cx| {
959            let settings_store = SettingsStore::test(cx);
960            cx.set_global(settings_store);
961            Project::init_settings(cx);
962            let mut settings = ProjectSettings::get_global(cx).clone();
963            for (id, config) in context_server_configurations {
964                settings.context_servers.insert(id, config);
965            }
966            ProjectSettings::override_global(settings, cx);
967        });
968
969        let fs = FakeFs::new(cx.executor());
970        fs.insert_tree(path!("/test"), files).await;
971        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
972
973        (fs, project)
974    }
975}