1pub mod extension;
2pub mod registry;
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result};
8use collections::{HashMap, HashSet};
9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
10use futures::{FutureExt as _, future::join_all};
11use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
12use registry::ContextServerDescriptorRegistry;
13use settings::{Settings as _, SettingsStore};
14use util::{ResultExt as _, rel_path::RelPath};
15
16use crate::{
17 Project,
18 project_settings::{ContextServerSettings, ProjectSettings},
19 worktree_store::WorktreeStore,
20};
21
22/// Maximum timeout for context server requests
23/// Prevents extremely large timeout values from tying up resources indefinitely.
24const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
25
26pub fn init(cx: &mut App) {
27 extension::init(cx);
28}
29
30actions!(
31 context_server,
32 [
33 /// Restarts the context server.
34 Restart
35 ]
36);
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub enum ContextServerStatus {
40 Starting,
41 Running,
42 Stopped,
43 Error(Arc<str>),
44}
45
46impl ContextServerStatus {
47 fn from_state(state: &ContextServerState) -> Self {
48 match state {
49 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
50 ContextServerState::Running { .. } => ContextServerStatus::Running,
51 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
52 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
53 }
54 }
55}
56
57enum ContextServerState {
58 Starting {
59 server: Arc<ContextServer>,
60 configuration: Arc<ContextServerConfiguration>,
61 _task: Task<()>,
62 },
63 Running {
64 server: Arc<ContextServer>,
65 configuration: Arc<ContextServerConfiguration>,
66 },
67 Stopped {
68 server: Arc<ContextServer>,
69 configuration: Arc<ContextServerConfiguration>,
70 },
71 Error {
72 server: Arc<ContextServer>,
73 configuration: Arc<ContextServerConfiguration>,
74 error: Arc<str>,
75 },
76}
77
78impl ContextServerState {
79 pub fn server(&self) -> Arc<ContextServer> {
80 match self {
81 ContextServerState::Starting { server, .. } => server.clone(),
82 ContextServerState::Running { server, .. } => server.clone(),
83 ContextServerState::Stopped { server, .. } => server.clone(),
84 ContextServerState::Error { server, .. } => server.clone(),
85 }
86 }
87
88 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
89 match self {
90 ContextServerState::Starting { configuration, .. } => configuration.clone(),
91 ContextServerState::Running { configuration, .. } => configuration.clone(),
92 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
93 ContextServerState::Error { configuration, .. } => configuration.clone(),
94 }
95 }
96}
97
98#[derive(Debug, PartialEq, Eq)]
99pub enum ContextServerConfiguration {
100 Custom {
101 command: ContextServerCommand,
102 },
103 Extension {
104 command: ContextServerCommand,
105 settings: serde_json::Value,
106 },
107 Http {
108 url: url::Url,
109 headers: HashMap<String, String>,
110 timeout: Option<u64>,
111 },
112}
113
114impl ContextServerConfiguration {
115 pub fn command(&self) -> Option<&ContextServerCommand> {
116 match self {
117 ContextServerConfiguration::Custom { command } => Some(command),
118 ContextServerConfiguration::Extension { command, .. } => Some(command),
119 ContextServerConfiguration::Http { .. } => None,
120 }
121 }
122
123 pub async fn from_settings(
124 settings: ContextServerSettings,
125 id: ContextServerId,
126 registry: Entity<ContextServerDescriptorRegistry>,
127 worktree_store: Entity<WorktreeStore>,
128 cx: &AsyncApp,
129 ) -> Option<Self> {
130 match settings {
131 ContextServerSettings::Stdio {
132 enabled: _,
133 command,
134 } => Some(ContextServerConfiguration::Custom { command }),
135 ContextServerSettings::Extension {
136 enabled: _,
137 settings,
138 } => {
139 let descriptor =
140 cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
141
142 match descriptor.command(worktree_store, cx).await {
143 Ok(command) => {
144 Some(ContextServerConfiguration::Extension { command, settings })
145 }
146 Err(e) => {
147 log::error!(
148 "Failed to create context server configuration from settings: {e:#}"
149 );
150 None
151 }
152 }
153 }
154 ContextServerSettings::Http {
155 enabled: _,
156 url,
157 headers: auth,
158 timeout,
159 } => {
160 let url = url::Url::parse(&url).log_err()?;
161 Some(ContextServerConfiguration::Http {
162 url,
163 headers: auth,
164 timeout,
165 })
166 }
167 }
168 }
169}
170
171pub type ContextServerFactory =
172 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
173
174pub struct ContextServerStore {
175 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
176 servers: HashMap<ContextServerId, ContextServerState>,
177 worktree_store: Entity<WorktreeStore>,
178 project: WeakEntity<Project>,
179 registry: Entity<ContextServerDescriptorRegistry>,
180 update_servers_task: Option<Task<Result<()>>>,
181 context_server_factory: Option<ContextServerFactory>,
182 needs_server_update: bool,
183 _subscriptions: Vec<Subscription>,
184}
185
186pub enum Event {
187 ServerStatusChanged {
188 server_id: ContextServerId,
189 status: ContextServerStatus,
190 },
191}
192
193impl EventEmitter<Event> for ContextServerStore {}
194
195impl ContextServerStore {
196 pub fn new(
197 worktree_store: Entity<WorktreeStore>,
198 weak_project: WeakEntity<Project>,
199 cx: &mut Context<Self>,
200 ) -> Self {
201 Self::new_internal(
202 true,
203 None,
204 ContextServerDescriptorRegistry::default_global(cx),
205 worktree_store,
206 weak_project,
207 cx,
208 )
209 }
210
211 /// Returns all configured context server ids, excluding the ones that are disabled
212 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
213 self.context_server_settings
214 .iter()
215 .filter(|(_, settings)| settings.enabled())
216 .map(|(id, _)| ContextServerId(id.clone()))
217 .collect()
218 }
219
220 #[cfg(any(test, feature = "test-support"))]
221 pub fn test(
222 registry: Entity<ContextServerDescriptorRegistry>,
223 worktree_store: Entity<WorktreeStore>,
224 weak_project: WeakEntity<Project>,
225 cx: &mut Context<Self>,
226 ) -> Self {
227 Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
228 }
229
230 #[cfg(any(test, feature = "test-support"))]
231 pub fn test_maintain_server_loop(
232 context_server_factory: Option<ContextServerFactory>,
233 registry: Entity<ContextServerDescriptorRegistry>,
234 worktree_store: Entity<WorktreeStore>,
235 weak_project: WeakEntity<Project>,
236 cx: &mut Context<Self>,
237 ) -> Self {
238 Self::new_internal(
239 true,
240 context_server_factory,
241 registry,
242 worktree_store,
243 weak_project,
244 cx,
245 )
246 }
247
248 fn new_internal(
249 maintain_server_loop: bool,
250 context_server_factory: Option<ContextServerFactory>,
251 registry: Entity<ContextServerDescriptorRegistry>,
252 worktree_store: Entity<WorktreeStore>,
253 weak_project: WeakEntity<Project>,
254 cx: &mut Context<Self>,
255 ) -> Self {
256 let subscriptions = if maintain_server_loop {
257 vec![
258 cx.observe(®istry, |this, _registry, cx| {
259 this.available_context_servers_changed(cx);
260 }),
261 cx.observe_global::<SettingsStore>(|this, cx| {
262 let settings =
263 &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
264 if &this.context_server_settings == settings {
265 return;
266 }
267 this.context_server_settings = settings.clone();
268 this.available_context_servers_changed(cx);
269 }),
270 ]
271 } else {
272 Vec::new()
273 };
274
275 let mut this = Self {
276 _subscriptions: subscriptions,
277 context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
278 .context_servers
279 .clone(),
280 worktree_store,
281 project: weak_project,
282 registry,
283 needs_server_update: false,
284 servers: HashMap::default(),
285 update_servers_task: None,
286 context_server_factory,
287 };
288 if maintain_server_loop {
289 this.available_context_servers_changed(cx);
290 }
291 this
292 }
293
294 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
295 self.servers.get(id).map(|state| state.server())
296 }
297
298 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
299 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
300 Some(server.clone())
301 } else {
302 None
303 }
304 }
305
306 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
307 self.servers.get(id).map(ContextServerStatus::from_state)
308 }
309
310 pub fn configuration_for_server(
311 &self,
312 id: &ContextServerId,
313 ) -> Option<Arc<ContextServerConfiguration>> {
314 self.servers.get(id).map(|state| state.configuration())
315 }
316
317 pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
318 self.servers
319 .keys()
320 .cloned()
321 .chain(
322 self.registry
323 .read(cx)
324 .context_server_descriptors()
325 .into_iter()
326 .map(|(id, _)| ContextServerId(id)),
327 )
328 .collect()
329 }
330
331 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
332 self.servers
333 .values()
334 .filter_map(|state| {
335 if let ContextServerState::Running { server, .. } = state {
336 Some(server.clone())
337 } else {
338 None
339 }
340 })
341 .collect()
342 }
343
344 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
345 cx.spawn(async move |this, cx| {
346 let this = this.upgrade().context("Context server store dropped")?;
347 let settings = this
348 .update(cx, |this, _| {
349 this.context_server_settings.get(&server.id().0).cloned()
350 })
351 .context("Failed to get context server settings")?;
352
353 if !settings.enabled() {
354 return anyhow::Ok(());
355 }
356
357 let (registry, worktree_store) = this.update(cx, |this, _| {
358 (this.registry.clone(), this.worktree_store.clone())
359 });
360 let configuration = ContextServerConfiguration::from_settings(
361 settings,
362 server.id(),
363 registry,
364 worktree_store,
365 cx,
366 )
367 .await
368 .context("Failed to create context server configuration")?;
369
370 this.update(cx, |this, cx| {
371 this.run_server(server, Arc::new(configuration), cx)
372 });
373 Ok(())
374 })
375 .detach_and_log_err(cx);
376 }
377
378 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
379 if matches!(
380 self.servers.get(id),
381 Some(ContextServerState::Stopped { .. })
382 ) {
383 return Ok(());
384 }
385
386 let state = self
387 .servers
388 .remove(id)
389 .context("Context server not found")?;
390
391 let server = state.server();
392 let configuration = state.configuration();
393 let mut result = Ok(());
394 if let ContextServerState::Running { server, .. } = &state {
395 result = server.stop();
396 }
397 drop(state);
398
399 self.update_server_state(
400 id.clone(),
401 ContextServerState::Stopped {
402 configuration,
403 server,
404 },
405 cx,
406 );
407
408 result
409 }
410
411 fn run_server(
412 &mut self,
413 server: Arc<ContextServer>,
414 configuration: Arc<ContextServerConfiguration>,
415 cx: &mut Context<Self>,
416 ) {
417 let id = server.id();
418 if matches!(
419 self.servers.get(&id),
420 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
421 ) {
422 self.stop_server(&id, cx).log_err();
423 }
424 let task = cx.spawn({
425 let id = server.id();
426 let server = server.clone();
427 let configuration = configuration.clone();
428
429 async move |this, cx| {
430 match server.clone().start(cx).await {
431 Ok(_) => {
432 debug_assert!(server.client().is_some());
433
434 this.update(cx, |this, cx| {
435 this.update_server_state(
436 id.clone(),
437 ContextServerState::Running {
438 server,
439 configuration,
440 },
441 cx,
442 )
443 })
444 .log_err()
445 }
446 Err(err) => {
447 log::error!("{} context server failed to start: {}", id, err);
448 this.update(cx, |this, cx| {
449 this.update_server_state(
450 id.clone(),
451 ContextServerState::Error {
452 configuration,
453 server,
454 error: err.to_string().into(),
455 },
456 cx,
457 )
458 })
459 .log_err()
460 }
461 };
462 }
463 });
464
465 self.update_server_state(
466 id.clone(),
467 ContextServerState::Starting {
468 configuration,
469 _task: task,
470 server,
471 },
472 cx,
473 );
474 }
475
476 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
477 let state = self
478 .servers
479 .remove(id)
480 .context("Context server not found")?;
481 drop(state);
482 cx.emit(Event::ServerStatusChanged {
483 server_id: id.clone(),
484 status: ContextServerStatus::Stopped,
485 });
486 Ok(())
487 }
488
489 fn create_context_server(
490 &self,
491 id: ContextServerId,
492 configuration: Arc<ContextServerConfiguration>,
493 cx: &mut Context<Self>,
494 ) -> Result<Arc<ContextServer>> {
495 let global_timeout =
496 Self::resolve_project_settings(&self.worktree_store, cx).context_server_timeout;
497
498 if let Some(factory) = self.context_server_factory.as_ref() {
499 return Ok(factory(id, configuration));
500 }
501
502 match configuration.as_ref() {
503 ContextServerConfiguration::Http {
504 url,
505 headers,
506 timeout,
507 } => Ok(Arc::new(ContextServer::http(
508 id,
509 url,
510 headers.clone(),
511 cx.http_client(),
512 cx.background_executor().clone(),
513 Some(Duration::from_secs(
514 timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
515 )),
516 )?)),
517 _ => {
518 let root_path = self
519 .project
520 .read_with(cx, |project, cx| project.active_project_directory(cx))
521 .ok()
522 .flatten()
523 .or_else(|| {
524 self.worktree_store.read_with(cx, |store, cx| {
525 store.visible_worktrees(cx).fold(None, |acc, item| {
526 if acc.is_none() {
527 item.read(cx).root_dir()
528 } else {
529 acc
530 }
531 })
532 })
533 });
534
535 let mut command = configuration
536 .command()
537 .context("Missing command configuration for stdio context server")?
538 .clone();
539 command.timeout = Some(
540 command
541 .timeout
542 .unwrap_or(global_timeout)
543 .min(MAX_TIMEOUT_SECS),
544 );
545
546 Ok(Arc::new(ContextServer::stdio(id, command, root_path)))
547 }
548 }
549 }
550
551 fn resolve_project_settings<'a>(
552 worktree_store: &'a Entity<WorktreeStore>,
553 cx: &'a App,
554 ) -> &'a ProjectSettings {
555 let location = worktree_store
556 .read(cx)
557 .visible_worktrees(cx)
558 .next()
559 .map(|worktree| settings::SettingsLocation {
560 worktree_id: worktree.read(cx).id(),
561 path: RelPath::empty(),
562 });
563 ProjectSettings::get(location, cx)
564 }
565
566 fn update_server_state(
567 &mut self,
568 id: ContextServerId,
569 state: ContextServerState,
570 cx: &mut Context<Self>,
571 ) {
572 let status = ContextServerStatus::from_state(&state);
573 self.servers.insert(id.clone(), state);
574 cx.emit(Event::ServerStatusChanged {
575 server_id: id,
576 status,
577 });
578 }
579
580 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
581 if self.update_servers_task.is_some() {
582 self.needs_server_update = true;
583 } else {
584 self.needs_server_update = false;
585 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
586 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
587 log::error!("Error maintaining context servers: {}", err);
588 }
589
590 this.update(cx, |this, cx| {
591 this.update_servers_task.take();
592 if this.needs_server_update {
593 this.available_context_servers_changed(cx);
594 }
595 })?;
596
597 Ok(())
598 }));
599 }
600 }
601
602 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
603 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
604 (
605 this.context_server_settings.clone(),
606 this.registry.clone(),
607 this.worktree_store.clone(),
608 )
609 })?;
610
611 for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
612 configured_servers
613 .entry(id)
614 .or_insert(ContextServerSettings::default_extension());
615 }
616
617 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
618 configured_servers
619 .into_iter()
620 .partition(|(_, settings)| settings.enabled());
621
622 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
623 let id = ContextServerId(id);
624 ContextServerConfiguration::from_settings(
625 settings,
626 id.clone(),
627 registry.clone(),
628 worktree_store.clone(),
629 cx,
630 )
631 .map(|config| (id, config))
632 }))
633 .await
634 .into_iter()
635 .filter_map(|(id, config)| config.map(|config| (id, config)))
636 .collect::<HashMap<_, _>>();
637
638 let mut servers_to_start = Vec::new();
639 let mut servers_to_remove = HashSet::default();
640 let mut servers_to_stop = HashSet::default();
641
642 this.update(cx, |this, cx| {
643 for server_id in this.servers.keys() {
644 // All servers that are not in desired_servers should be removed from the store.
645 // This can happen if the user removed a server from the context server settings.
646 if !configured_servers.contains_key(server_id) {
647 if disabled_servers.contains_key(&server_id.0) {
648 servers_to_stop.insert(server_id.clone());
649 } else {
650 servers_to_remove.insert(server_id.clone());
651 }
652 }
653 }
654
655 for (id, config) in configured_servers {
656 let state = this.servers.get(&id);
657 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
658 let existing_config = state.as_ref().map(|state| state.configuration());
659 if existing_config.as_deref() != Some(&config) || is_stopped {
660 let config = Arc::new(config);
661 let server = this.create_context_server(id.clone(), config.clone(), cx)?;
662 servers_to_start.push((server, config));
663 if this.servers.contains_key(&id) {
664 servers_to_stop.insert(id);
665 }
666 }
667 }
668
669 anyhow::Ok(())
670 })??;
671
672 this.update(cx, |this, cx| {
673 for id in servers_to_stop {
674 this.stop_server(&id, cx)?;
675 }
676 for id in servers_to_remove {
677 this.remove_server(&id, cx)?;
678 }
679 for (server, config) in servers_to_start {
680 this.run_server(server, config, cx);
681 }
682 anyhow::Ok(())
683 })?
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690 use crate::{
691 FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
692 project_settings::ProjectSettings,
693 };
694 use context_server::test::create_fake_transport;
695 use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
696 use http_client::{FakeHttpClient, Response};
697 use serde_json::json;
698 use std::{cell::RefCell, path::PathBuf, rc::Rc};
699 use util::path;
700
701 #[gpui::test]
702 async fn test_context_server_status(cx: &mut TestAppContext) {
703 const SERVER_1_ID: &str = "mcp-1";
704 const SERVER_2_ID: &str = "mcp-2";
705
706 let (_fs, project) = setup_context_server_test(
707 cx,
708 json!({"code.rs": ""}),
709 vec![
710 (SERVER_1_ID.into(), dummy_server_settings()),
711 (SERVER_2_ID.into(), dummy_server_settings()),
712 ],
713 )
714 .await;
715
716 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
717 let store = cx.new(|cx| {
718 ContextServerStore::test(
719 registry.clone(),
720 project.read(cx).worktree_store(),
721 project.downgrade(),
722 cx,
723 )
724 });
725
726 let server_1_id = ContextServerId(SERVER_1_ID.into());
727 let server_2_id = ContextServerId(SERVER_2_ID.into());
728
729 let server_1 = Arc::new(ContextServer::new(
730 server_1_id.clone(),
731 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
732 ));
733 let server_2 = Arc::new(ContextServer::new(
734 server_2_id.clone(),
735 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
736 ));
737
738 store.update(cx, |store, cx| store.start_server(server_1, cx));
739
740 cx.run_until_parked();
741
742 cx.update(|cx| {
743 assert_eq!(
744 store.read(cx).status_for_server(&server_1_id),
745 Some(ContextServerStatus::Running)
746 );
747 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
748 });
749
750 store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
751
752 cx.run_until_parked();
753
754 cx.update(|cx| {
755 assert_eq!(
756 store.read(cx).status_for_server(&server_1_id),
757 Some(ContextServerStatus::Running)
758 );
759 assert_eq!(
760 store.read(cx).status_for_server(&server_2_id),
761 Some(ContextServerStatus::Running)
762 );
763 });
764
765 store
766 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
767 .unwrap();
768
769 cx.update(|cx| {
770 assert_eq!(
771 store.read(cx).status_for_server(&server_1_id),
772 Some(ContextServerStatus::Running)
773 );
774 assert_eq!(
775 store.read(cx).status_for_server(&server_2_id),
776 Some(ContextServerStatus::Stopped)
777 );
778 });
779 }
780
781 #[gpui::test]
782 async fn test_context_server_status_events(cx: &mut TestAppContext) {
783 const SERVER_1_ID: &str = "mcp-1";
784 const SERVER_2_ID: &str = "mcp-2";
785
786 let (_fs, project) = setup_context_server_test(
787 cx,
788 json!({"code.rs": ""}),
789 vec![
790 (SERVER_1_ID.into(), dummy_server_settings()),
791 (SERVER_2_ID.into(), dummy_server_settings()),
792 ],
793 )
794 .await;
795
796 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
797 let store = cx.new(|cx| {
798 ContextServerStore::test(
799 registry.clone(),
800 project.read(cx).worktree_store(),
801 project.downgrade(),
802 cx,
803 )
804 });
805
806 let server_1_id = ContextServerId(SERVER_1_ID.into());
807 let server_2_id = ContextServerId(SERVER_2_ID.into());
808
809 let server_1 = Arc::new(ContextServer::new(
810 server_1_id.clone(),
811 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
812 ));
813 let server_2 = Arc::new(ContextServer::new(
814 server_2_id.clone(),
815 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
816 ));
817
818 let _server_events = assert_server_events(
819 &store,
820 vec![
821 (server_1_id.clone(), ContextServerStatus::Starting),
822 (server_1_id, ContextServerStatus::Running),
823 (server_2_id.clone(), ContextServerStatus::Starting),
824 (server_2_id.clone(), ContextServerStatus::Running),
825 (server_2_id.clone(), ContextServerStatus::Stopped),
826 ],
827 cx,
828 );
829
830 store.update(cx, |store, cx| store.start_server(server_1, cx));
831
832 cx.run_until_parked();
833
834 store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
835
836 cx.run_until_parked();
837
838 store
839 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
840 .unwrap();
841 }
842
843 #[gpui::test(iterations = 25)]
844 async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
845 const SERVER_1_ID: &str = "mcp-1";
846
847 let (_fs, project) = setup_context_server_test(
848 cx,
849 json!({"code.rs": ""}),
850 vec![(SERVER_1_ID.into(), dummy_server_settings())],
851 )
852 .await;
853
854 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
855 let store = cx.new(|cx| {
856 ContextServerStore::test(
857 registry.clone(),
858 project.read(cx).worktree_store(),
859 project.downgrade(),
860 cx,
861 )
862 });
863
864 let server_id = ContextServerId(SERVER_1_ID.into());
865
866 let server_with_same_id_1 = Arc::new(ContextServer::new(
867 server_id.clone(),
868 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
869 ));
870 let server_with_same_id_2 = Arc::new(ContextServer::new(
871 server_id.clone(),
872 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
873 ));
874
875 // If we start another server with the same id, we should report that we stopped the previous one
876 let _server_events = assert_server_events(
877 &store,
878 vec![
879 (server_id.clone(), ContextServerStatus::Starting),
880 (server_id.clone(), ContextServerStatus::Stopped),
881 (server_id.clone(), ContextServerStatus::Starting),
882 (server_id.clone(), ContextServerStatus::Running),
883 ],
884 cx,
885 );
886
887 store.update(cx, |store, cx| {
888 store.start_server(server_with_same_id_1.clone(), cx)
889 });
890 store.update(cx, |store, cx| {
891 store.start_server(server_with_same_id_2.clone(), cx)
892 });
893
894 cx.run_until_parked();
895
896 cx.update(|cx| {
897 assert_eq!(
898 store.read(cx).status_for_server(&server_id),
899 Some(ContextServerStatus::Running)
900 );
901 });
902 }
903
904 #[gpui::test]
905 async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
906 const SERVER_1_ID: &str = "mcp-1";
907 const SERVER_2_ID: &str = "mcp-2";
908
909 let server_1_id = ContextServerId(SERVER_1_ID.into());
910 let server_2_id = ContextServerId(SERVER_2_ID.into());
911
912 let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
913
914 let (_fs, project) = setup_context_server_test(
915 cx,
916 json!({"code.rs": ""}),
917 vec![(
918 SERVER_1_ID.into(),
919 ContextServerSettings::Extension {
920 enabled: true,
921 settings: json!({
922 "somevalue": true
923 }),
924 },
925 )],
926 )
927 .await;
928
929 let executor = cx.executor();
930 let registry = cx.new(|cx| {
931 let mut registry = ContextServerDescriptorRegistry::new();
932 registry.register_context_server_descriptor(SERVER_1_ID.into(), fake_descriptor_1, cx);
933 registry
934 });
935 let store = cx.new(|cx| {
936 ContextServerStore::test_maintain_server_loop(
937 Some(Box::new(move |id, _| {
938 Arc::new(ContextServer::new(
939 id.clone(),
940 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
941 ))
942 })),
943 registry.clone(),
944 project.read(cx).worktree_store(),
945 project.downgrade(),
946 cx,
947 )
948 });
949
950 // Ensure that mcp-1 starts up
951 {
952 let _server_events = assert_server_events(
953 &store,
954 vec![
955 (server_1_id.clone(), ContextServerStatus::Starting),
956 (server_1_id.clone(), ContextServerStatus::Running),
957 ],
958 cx,
959 );
960 cx.run_until_parked();
961 }
962
963 // Ensure that mcp-1 is restarted when the configuration was changed
964 {
965 let _server_events = assert_server_events(
966 &store,
967 vec![
968 (server_1_id.clone(), ContextServerStatus::Stopped),
969 (server_1_id.clone(), ContextServerStatus::Starting),
970 (server_1_id.clone(), ContextServerStatus::Running),
971 ],
972 cx,
973 );
974 set_context_server_configuration(
975 vec![(
976 server_1_id.0.clone(),
977 settings::ContextServerSettingsContent::Extension {
978 enabled: true,
979 settings: json!({
980 "somevalue": false
981 }),
982 },
983 )],
984 cx,
985 );
986
987 cx.run_until_parked();
988 }
989
990 // Ensure that mcp-1 is not restarted when the configuration was not changed
991 {
992 let _server_events = assert_server_events(&store, vec![], cx);
993 set_context_server_configuration(
994 vec![(
995 server_1_id.0.clone(),
996 settings::ContextServerSettingsContent::Extension {
997 enabled: true,
998 settings: json!({
999 "somevalue": false
1000 }),
1001 },
1002 )],
1003 cx,
1004 );
1005
1006 cx.run_until_parked();
1007 }
1008
1009 // Ensure that mcp-2 is started once it is added to the settings
1010 {
1011 let _server_events = assert_server_events(
1012 &store,
1013 vec![
1014 (server_2_id.clone(), ContextServerStatus::Starting),
1015 (server_2_id.clone(), ContextServerStatus::Running),
1016 ],
1017 cx,
1018 );
1019 set_context_server_configuration(
1020 vec![
1021 (
1022 server_1_id.0.clone(),
1023 settings::ContextServerSettingsContent::Extension {
1024 enabled: true,
1025 settings: json!({
1026 "somevalue": false
1027 }),
1028 },
1029 ),
1030 (
1031 server_2_id.0.clone(),
1032 settings::ContextServerSettingsContent::Stdio {
1033 enabled: true,
1034 command: ContextServerCommand {
1035 path: "somebinary".into(),
1036 args: vec!["arg".to_string()],
1037 env: None,
1038 timeout: None,
1039 },
1040 },
1041 ),
1042 ],
1043 cx,
1044 );
1045
1046 cx.run_until_parked();
1047 }
1048
1049 // Ensure that mcp-2 is restarted once the args have changed
1050 {
1051 let _server_events = assert_server_events(
1052 &store,
1053 vec![
1054 (server_2_id.clone(), ContextServerStatus::Stopped),
1055 (server_2_id.clone(), ContextServerStatus::Starting),
1056 (server_2_id.clone(), ContextServerStatus::Running),
1057 ],
1058 cx,
1059 );
1060 set_context_server_configuration(
1061 vec![
1062 (
1063 server_1_id.0.clone(),
1064 settings::ContextServerSettingsContent::Extension {
1065 enabled: true,
1066 settings: json!({
1067 "somevalue": false
1068 }),
1069 },
1070 ),
1071 (
1072 server_2_id.0.clone(),
1073 settings::ContextServerSettingsContent::Stdio {
1074 enabled: true,
1075 command: ContextServerCommand {
1076 path: "somebinary".into(),
1077 args: vec!["anotherArg".to_string()],
1078 env: None,
1079 timeout: None,
1080 },
1081 },
1082 ),
1083 ],
1084 cx,
1085 );
1086
1087 cx.run_until_parked();
1088 }
1089
1090 // Ensure that mcp-2 is removed once it is removed from the settings
1091 {
1092 let _server_events = assert_server_events(
1093 &store,
1094 vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1095 cx,
1096 );
1097 set_context_server_configuration(
1098 vec![(
1099 server_1_id.0.clone(),
1100 settings::ContextServerSettingsContent::Extension {
1101 enabled: true,
1102 settings: json!({
1103 "somevalue": false
1104 }),
1105 },
1106 )],
1107 cx,
1108 );
1109
1110 cx.run_until_parked();
1111
1112 cx.update(|cx| {
1113 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1114 });
1115 }
1116
1117 // Ensure that nothing happens if the settings do not change
1118 {
1119 let _server_events = assert_server_events(&store, vec![], cx);
1120 set_context_server_configuration(
1121 vec![(
1122 server_1_id.0.clone(),
1123 settings::ContextServerSettingsContent::Extension {
1124 enabled: true,
1125 settings: json!({
1126 "somevalue": false
1127 }),
1128 },
1129 )],
1130 cx,
1131 );
1132
1133 cx.run_until_parked();
1134
1135 cx.update(|cx| {
1136 assert_eq!(
1137 store.read(cx).status_for_server(&server_1_id),
1138 Some(ContextServerStatus::Running)
1139 );
1140 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1141 });
1142 }
1143 }
1144
1145 #[gpui::test]
1146 async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1147 const SERVER_1_ID: &str = "mcp-1";
1148
1149 let server_1_id = ContextServerId(SERVER_1_ID.into());
1150
1151 let (_fs, project) = setup_context_server_test(
1152 cx,
1153 json!({"code.rs": ""}),
1154 vec![(
1155 SERVER_1_ID.into(),
1156 ContextServerSettings::Stdio {
1157 enabled: true,
1158 command: ContextServerCommand {
1159 path: "somebinary".into(),
1160 args: vec!["arg".to_string()],
1161 env: None,
1162 timeout: None,
1163 },
1164 },
1165 )],
1166 )
1167 .await;
1168
1169 let executor = cx.executor();
1170 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1171 let store = cx.new(|cx| {
1172 ContextServerStore::test_maintain_server_loop(
1173 Some(Box::new(move |id, _| {
1174 Arc::new(ContextServer::new(
1175 id.clone(),
1176 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1177 ))
1178 })),
1179 registry.clone(),
1180 project.read(cx).worktree_store(),
1181 project.downgrade(),
1182 cx,
1183 )
1184 });
1185
1186 // Ensure that mcp-1 starts up
1187 {
1188 let _server_events = assert_server_events(
1189 &store,
1190 vec![
1191 (server_1_id.clone(), ContextServerStatus::Starting),
1192 (server_1_id.clone(), ContextServerStatus::Running),
1193 ],
1194 cx,
1195 );
1196 cx.run_until_parked();
1197 }
1198
1199 // Ensure that mcp-1 is stopped once it is disabled.
1200 {
1201 let _server_events = assert_server_events(
1202 &store,
1203 vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1204 cx,
1205 );
1206 set_context_server_configuration(
1207 vec![(
1208 server_1_id.0.clone(),
1209 settings::ContextServerSettingsContent::Stdio {
1210 enabled: false,
1211 command: ContextServerCommand {
1212 path: "somebinary".into(),
1213 args: vec!["arg".to_string()],
1214 env: None,
1215 timeout: None,
1216 },
1217 },
1218 )],
1219 cx,
1220 );
1221
1222 cx.run_until_parked();
1223 }
1224
1225 // Ensure that mcp-1 is started once it is enabled again.
1226 {
1227 let _server_events = assert_server_events(
1228 &store,
1229 vec![
1230 (server_1_id.clone(), ContextServerStatus::Starting),
1231 (server_1_id.clone(), ContextServerStatus::Running),
1232 ],
1233 cx,
1234 );
1235 set_context_server_configuration(
1236 vec![(
1237 server_1_id.0.clone(),
1238 settings::ContextServerSettingsContent::Stdio {
1239 enabled: true,
1240 command: ContextServerCommand {
1241 path: "somebinary".into(),
1242 args: vec!["arg".to_string()],
1243 timeout: None,
1244 env: None,
1245 },
1246 },
1247 )],
1248 cx,
1249 );
1250
1251 cx.run_until_parked();
1252 }
1253 }
1254
1255 fn set_context_server_configuration(
1256 context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1257 cx: &mut TestAppContext,
1258 ) {
1259 cx.update(|cx| {
1260 SettingsStore::update_global(cx, |store, cx| {
1261 store.update_user_settings(cx, |content| {
1262 content.project.context_servers.clear();
1263 for (id, config) in context_servers {
1264 content.project.context_servers.insert(id, config);
1265 }
1266 });
1267 })
1268 });
1269 }
1270
1271 #[gpui::test]
1272 async fn test_remote_context_server(cx: &mut TestAppContext) {
1273 const SERVER_ID: &str = "remote-server";
1274 let server_id = ContextServerId(SERVER_ID.into());
1275 let server_url = "http://example.com/api";
1276
1277 let (_fs, project) = setup_context_server_test(
1278 cx,
1279 json!({ "code.rs": "" }),
1280 vec![(
1281 SERVER_ID.into(),
1282 ContextServerSettings::Http {
1283 enabled: true,
1284 url: server_url.to_string(),
1285 headers: Default::default(),
1286 timeout: None,
1287 },
1288 )],
1289 )
1290 .await;
1291
1292 let client = FakeHttpClient::create(|_| async move {
1293 use http_client::AsyncBody;
1294
1295 let response = Response::builder()
1296 .status(200)
1297 .header("Content-Type", "application/json")
1298 .body(AsyncBody::from(
1299 serde_json::to_string(&json!({
1300 "jsonrpc": "2.0",
1301 "id": 0,
1302 "result": {
1303 "protocolVersion": "2024-11-05",
1304 "capabilities": {},
1305 "serverInfo": {
1306 "name": "test-server",
1307 "version": "1.0.0"
1308 }
1309 }
1310 }))
1311 .unwrap(),
1312 ))
1313 .unwrap();
1314 Ok(response)
1315 });
1316 cx.update(|cx| cx.set_http_client(client));
1317 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1318 let store = cx.new(|cx| {
1319 ContextServerStore::test_maintain_server_loop(
1320 None,
1321 registry.clone(),
1322 project.read(cx).worktree_store(),
1323 project.downgrade(),
1324 cx,
1325 )
1326 });
1327
1328 let _server_events = assert_server_events(
1329 &store,
1330 vec![
1331 (server_id.clone(), ContextServerStatus::Starting),
1332 (server_id.clone(), ContextServerStatus::Running),
1333 ],
1334 cx,
1335 );
1336 cx.run_until_parked();
1337 }
1338
1339 struct ServerEvents {
1340 received_event_count: Rc<RefCell<usize>>,
1341 expected_event_count: usize,
1342 _subscription: Subscription,
1343 }
1344
1345 impl Drop for ServerEvents {
1346 fn drop(&mut self) {
1347 let actual_event_count = *self.received_event_count.borrow();
1348 assert_eq!(
1349 actual_event_count, self.expected_event_count,
1350 "
1351 Expected to receive {} context server store events, but received {} events",
1352 self.expected_event_count, actual_event_count
1353 );
1354 }
1355 }
1356
1357 #[gpui::test]
1358 async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1359 cx.update(|cx| {
1360 let settings_store = SettingsStore::test(cx);
1361 cx.set_global(settings_store);
1362 SettingsStore::update_global(cx, |store, cx| {
1363 store
1364 .set_user_settings(r#"{"context_server_timeout": 90}"#, cx)
1365 .expect("Failed to set test user settings");
1366 });
1367 });
1368
1369 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1370
1371 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1372 let store = cx.new(|cx| {
1373 ContextServerStore::test(
1374 registry.clone(),
1375 project.read(cx).worktree_store(),
1376 project.downgrade(),
1377 cx,
1378 )
1379 });
1380
1381 let result = store.update(cx, |store, cx| {
1382 store.create_context_server(
1383 ContextServerId("test-server".into()),
1384 Arc::new(ContextServerConfiguration::Http {
1385 url: url::Url::parse("http://localhost:8080")
1386 .expect("Failed to parse test URL"),
1387 headers: Default::default(),
1388 timeout: None,
1389 }),
1390 cx,
1391 )
1392 });
1393
1394 assert!(
1395 result.is_ok(),
1396 "Server should be created successfully with global timeout"
1397 );
1398 }
1399
1400 #[gpui::test]
1401 async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1402 const SERVER_ID: &str = "test-server";
1403
1404 cx.update(|cx| {
1405 let settings_store = SettingsStore::test(cx);
1406 cx.set_global(settings_store);
1407 SettingsStore::update_global(cx, |store, cx| {
1408 store
1409 .set_user_settings(r#"{"context_server_timeout": 60}"#, cx)
1410 .expect("Failed to set test user settings");
1411 });
1412 });
1413
1414 let (_fs, project) = setup_context_server_test(
1415 cx,
1416 json!({"code.rs": ""}),
1417 vec![(
1418 SERVER_ID.into(),
1419 ContextServerSettings::Http {
1420 enabled: true,
1421 url: "http://localhost:8080".to_string(),
1422 headers: Default::default(),
1423 timeout: Some(120),
1424 },
1425 )],
1426 )
1427 .await;
1428
1429 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1430 let store = cx.new(|cx| {
1431 ContextServerStore::test(
1432 registry.clone(),
1433 project.read(cx).worktree_store(),
1434 project.downgrade(),
1435 cx,
1436 )
1437 });
1438
1439 let result = store.update(cx, |store, cx| {
1440 store.create_context_server(
1441 ContextServerId("test-server".into()),
1442 Arc::new(ContextServerConfiguration::Http {
1443 url: url::Url::parse("http://localhost:8080")
1444 .expect("Failed to parse test URL"),
1445 headers: Default::default(),
1446 timeout: Some(120),
1447 }),
1448 cx,
1449 )
1450 });
1451
1452 assert!(
1453 result.is_ok(),
1454 "Server should be created successfully with per-server timeout override"
1455 );
1456 }
1457
1458 #[gpui::test]
1459 async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1460 const SERVER_ID: &str = "stdio-server";
1461
1462 let (_fs, project) = setup_context_server_test(
1463 cx,
1464 json!({"code.rs": ""}),
1465 vec![(
1466 SERVER_ID.into(),
1467 ContextServerSettings::Stdio {
1468 enabled: true,
1469 command: ContextServerCommand {
1470 path: "/usr/bin/node".into(),
1471 args: vec!["server.js".into()],
1472 env: None,
1473 timeout: Some(180000),
1474 },
1475 },
1476 )],
1477 )
1478 .await;
1479
1480 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1481 let store = cx.new(|cx| {
1482 ContextServerStore::test(
1483 registry.clone(),
1484 project.read(cx).worktree_store(),
1485 project.downgrade(),
1486 cx,
1487 )
1488 });
1489
1490 let result = store.update(cx, |store, cx| {
1491 store.create_context_server(
1492 ContextServerId("stdio-server".into()),
1493 Arc::new(ContextServerConfiguration::Custom {
1494 command: ContextServerCommand {
1495 path: "/usr/bin/node".into(),
1496 args: vec!["server.js".into()],
1497 env: None,
1498 timeout: Some(180000),
1499 },
1500 }),
1501 cx,
1502 )
1503 });
1504
1505 assert!(
1506 result.is_ok(),
1507 "Stdio server should be created successfully with timeout"
1508 );
1509 }
1510
1511 fn dummy_server_settings() -> ContextServerSettings {
1512 ContextServerSettings::Stdio {
1513 enabled: true,
1514 command: ContextServerCommand {
1515 path: "somebinary".into(),
1516 args: vec!["arg".to_string()],
1517 env: None,
1518 timeout: None,
1519 },
1520 }
1521 }
1522
1523 fn assert_server_events(
1524 store: &Entity<ContextServerStore>,
1525 expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1526 cx: &mut TestAppContext,
1527 ) -> ServerEvents {
1528 cx.update(|cx| {
1529 let mut ix = 0;
1530 let received_event_count = Rc::new(RefCell::new(0));
1531 let expected_event_count = expected_events.len();
1532 let subscription = cx.subscribe(store, {
1533 let received_event_count = received_event_count.clone();
1534 move |_, event, _| match event {
1535 Event::ServerStatusChanged {
1536 server_id: actual_server_id,
1537 status: actual_status,
1538 } => {
1539 let (expected_server_id, expected_status) = &expected_events[ix];
1540
1541 assert_eq!(
1542 actual_server_id, expected_server_id,
1543 "Expected different server id at index {}",
1544 ix
1545 );
1546 assert_eq!(
1547 actual_status, expected_status,
1548 "Expected different status at index {}",
1549 ix
1550 );
1551 ix += 1;
1552 *received_event_count.borrow_mut() += 1;
1553 }
1554 }
1555 });
1556 ServerEvents {
1557 expected_event_count,
1558 received_event_count,
1559 _subscription: subscription,
1560 }
1561 })
1562 }
1563
1564 async fn setup_context_server_test(
1565 cx: &mut TestAppContext,
1566 files: serde_json::Value,
1567 context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1568 ) -> (Arc<FakeFs>, Entity<Project>) {
1569 cx.update(|cx| {
1570 let settings_store = SettingsStore::test(cx);
1571 cx.set_global(settings_store);
1572 let mut settings = ProjectSettings::get_global(cx).clone();
1573 for (id, config) in context_server_configurations {
1574 settings.context_servers.insert(id, config);
1575 }
1576 ProjectSettings::override_global(settings, cx);
1577 });
1578
1579 let fs = FakeFs::new(cx.executor());
1580 fs.insert_tree(path!("/test"), files).await;
1581 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1582
1583 (fs, project)
1584 }
1585
1586 struct FakeContextServerDescriptor {
1587 path: PathBuf,
1588 }
1589
1590 impl FakeContextServerDescriptor {
1591 fn new(path: impl Into<PathBuf>) -> Self {
1592 Self { path: path.into() }
1593 }
1594 }
1595
1596 impl ContextServerDescriptor for FakeContextServerDescriptor {
1597 fn command(
1598 &self,
1599 _worktree_store: Entity<WorktreeStore>,
1600 _cx: &AsyncApp,
1601 ) -> Task<Result<ContextServerCommand>> {
1602 Task::ready(Ok(ContextServerCommand {
1603 path: self.path.clone(),
1604 args: vec!["arg1".to_string(), "arg2".to_string()],
1605 env: None,
1606 timeout: None,
1607 }))
1608 }
1609
1610 fn configuration(
1611 &self,
1612 _worktree_store: Entity<WorktreeStore>,
1613 _cx: &AsyncApp,
1614 ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1615 Task::ready(Ok(None))
1616 }
1617 }
1618}