1pub mod extension;
2pub mod registry;
3
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context as _, Result};
9use collections::{HashMap, HashSet};
10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
11use futures::{FutureExt as _, future::join_all};
12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
13use registry::ContextServerDescriptorRegistry;
14use remote::RemoteClient;
15use rpc::{AnyProtoClient, TypedEnvelope, proto};
16use settings::{Settings as _, SettingsStore};
17use util::{ResultExt as _, rel_path::RelPath};
18
19use crate::{
20 Project,
21 project_settings::{ContextServerSettings, ProjectSettings},
22 worktree_store::WorktreeStore,
23};
24
25/// Maximum timeout for context server requests
26/// Prevents extremely large timeout values from tying up resources indefinitely.
27const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
28
29pub fn init(cx: &mut App) {
30 extension::init(cx);
31}
32
33actions!(
34 context_server,
35 [
36 /// Restarts the context server.
37 Restart
38 ]
39);
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub enum ContextServerStatus {
43 Starting,
44 Running,
45 Stopped,
46 Error(Arc<str>),
47}
48
49impl ContextServerStatus {
50 fn from_state(state: &ContextServerState) -> Self {
51 match state {
52 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
53 ContextServerState::Running { .. } => ContextServerStatus::Running,
54 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
55 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
56 }
57 }
58}
59
60enum ContextServerState {
61 Starting {
62 server: Arc<ContextServer>,
63 configuration: Arc<ContextServerConfiguration>,
64 _task: Task<()>,
65 },
66 Running {
67 server: Arc<ContextServer>,
68 configuration: Arc<ContextServerConfiguration>,
69 },
70 Stopped {
71 server: Arc<ContextServer>,
72 configuration: Arc<ContextServerConfiguration>,
73 },
74 Error {
75 server: Arc<ContextServer>,
76 configuration: Arc<ContextServerConfiguration>,
77 error: Arc<str>,
78 },
79}
80
81impl ContextServerState {
82 pub fn server(&self) -> Arc<ContextServer> {
83 match self {
84 ContextServerState::Starting { server, .. } => server.clone(),
85 ContextServerState::Running { server, .. } => server.clone(),
86 ContextServerState::Stopped { server, .. } => server.clone(),
87 ContextServerState::Error { server, .. } => server.clone(),
88 }
89 }
90
91 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
92 match self {
93 ContextServerState::Starting { configuration, .. } => configuration.clone(),
94 ContextServerState::Running { configuration, .. } => configuration.clone(),
95 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
96 ContextServerState::Error { configuration, .. } => configuration.clone(),
97 }
98 }
99}
100
101#[derive(Debug, PartialEq, Eq)]
102pub enum ContextServerConfiguration {
103 Custom {
104 command: ContextServerCommand,
105 remote: bool,
106 },
107 Extension {
108 command: ContextServerCommand,
109 settings: serde_json::Value,
110 remote: bool,
111 },
112 Http {
113 url: url::Url,
114 headers: HashMap<String, String>,
115 timeout: Option<u64>,
116 },
117}
118
119impl ContextServerConfiguration {
120 pub fn command(&self) -> Option<&ContextServerCommand> {
121 match self {
122 ContextServerConfiguration::Custom { command, .. } => Some(command),
123 ContextServerConfiguration::Extension { command, .. } => Some(command),
124 ContextServerConfiguration::Http { .. } => None,
125 }
126 }
127
128 pub fn remote(&self) -> bool {
129 match self {
130 ContextServerConfiguration::Custom { remote, .. } => *remote,
131 ContextServerConfiguration::Extension { remote, .. } => *remote,
132 ContextServerConfiguration::Http { .. } => false,
133 }
134 }
135
136 pub async fn from_settings(
137 settings: ContextServerSettings,
138 id: ContextServerId,
139 registry: Entity<ContextServerDescriptorRegistry>,
140 worktree_store: Entity<WorktreeStore>,
141 cx: &AsyncApp,
142 ) -> Option<Self> {
143 match settings {
144 ContextServerSettings::Stdio {
145 enabled: _,
146 command,
147 remote,
148 } => Some(ContextServerConfiguration::Custom { command, remote }),
149 ContextServerSettings::Extension {
150 enabled: _,
151 settings,
152 remote,
153 } => {
154 let descriptor =
155 cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
156
157 match descriptor.command(worktree_store, cx).await {
158 Ok(command) => Some(ContextServerConfiguration::Extension {
159 command,
160 settings,
161 remote,
162 }),
163 Err(e) => {
164 log::error!(
165 "Failed to create context server configuration from settings: {e:#}"
166 );
167 None
168 }
169 }
170 }
171 ContextServerSettings::Http {
172 enabled: _,
173 url,
174 headers: auth,
175 timeout,
176 } => {
177 let url = url::Url::parse(&url).log_err()?;
178 Some(ContextServerConfiguration::Http {
179 url,
180 headers: auth,
181 timeout,
182 })
183 }
184 }
185 }
186}
187
188pub type ContextServerFactory =
189 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
190
191enum ContextServerStoreState {
192 Local {
193 downstream_client: Option<(u64, AnyProtoClient)>,
194 is_headless: bool,
195 },
196 Remote {
197 project_id: u64,
198 upstream_client: Entity<RemoteClient>,
199 },
200}
201
202pub struct ContextServerStore {
203 state: ContextServerStoreState,
204 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
205 servers: HashMap<ContextServerId, ContextServerState>,
206 worktree_store: Entity<WorktreeStore>,
207 project: Option<WeakEntity<Project>>,
208 registry: Entity<ContextServerDescriptorRegistry>,
209 update_servers_task: Option<Task<Result<()>>>,
210 context_server_factory: Option<ContextServerFactory>,
211 needs_server_update: bool,
212 _subscriptions: Vec<Subscription>,
213}
214
215pub enum Event {
216 ServerStatusChanged {
217 server_id: ContextServerId,
218 status: ContextServerStatus,
219 },
220}
221
222impl EventEmitter<Event> for ContextServerStore {}
223
224impl ContextServerStore {
225 pub fn local(
226 worktree_store: Entity<WorktreeStore>,
227 weak_project: Option<WeakEntity<Project>>,
228 headless: bool,
229 cx: &mut Context<Self>,
230 ) -> Self {
231 Self::new_internal(
232 !headless,
233 None,
234 ContextServerDescriptorRegistry::default_global(cx),
235 worktree_store,
236 weak_project,
237 ContextServerStoreState::Local {
238 downstream_client: None,
239 is_headless: headless,
240 },
241 cx,
242 )
243 }
244
245 pub fn remote(
246 project_id: u64,
247 upstream_client: Entity<RemoteClient>,
248 worktree_store: Entity<WorktreeStore>,
249 weak_project: Option<WeakEntity<Project>>,
250 cx: &mut Context<Self>,
251 ) -> Self {
252 Self::new_internal(
253 true,
254 None,
255 ContextServerDescriptorRegistry::default_global(cx),
256 worktree_store,
257 weak_project,
258 ContextServerStoreState::Remote {
259 project_id,
260 upstream_client,
261 },
262 cx,
263 )
264 }
265
266 pub fn init_headless(session: &AnyProtoClient) {
267 session.add_entity_request_handler(Self::handle_get_context_server_command);
268 }
269
270 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
271 if let ContextServerStoreState::Local {
272 downstream_client, ..
273 } = &mut self.state
274 {
275 *downstream_client = Some((project_id, client));
276 }
277 }
278
279 pub fn is_remote_project(&self) -> bool {
280 matches!(self.state, ContextServerStoreState::Remote { .. })
281 }
282
283 /// Returns all configured context server ids, excluding the ones that are disabled
284 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
285 self.context_server_settings
286 .iter()
287 .filter(|(_, settings)| settings.enabled())
288 .map(|(id, _)| ContextServerId(id.clone()))
289 .collect()
290 }
291
292 #[cfg(any(test, feature = "test-support"))]
293 pub fn test(
294 registry: Entity<ContextServerDescriptorRegistry>,
295 worktree_store: Entity<WorktreeStore>,
296 weak_project: Option<WeakEntity<Project>>,
297 cx: &mut Context<Self>,
298 ) -> Self {
299 Self::new_internal(
300 false,
301 None,
302 registry,
303 worktree_store,
304 weak_project,
305 ContextServerStoreState::Local {
306 downstream_client: None,
307 is_headless: false,
308 },
309 cx,
310 )
311 }
312
313 #[cfg(any(test, feature = "test-support"))]
314 pub fn test_maintain_server_loop(
315 context_server_factory: Option<ContextServerFactory>,
316 registry: Entity<ContextServerDescriptorRegistry>,
317 worktree_store: Entity<WorktreeStore>,
318 weak_project: Option<WeakEntity<Project>>,
319 cx: &mut Context<Self>,
320 ) -> Self {
321 Self::new_internal(
322 true,
323 context_server_factory,
324 registry,
325 worktree_store,
326 weak_project,
327 ContextServerStoreState::Local {
328 downstream_client: None,
329 is_headless: false,
330 },
331 cx,
332 )
333 }
334
335 #[cfg(any(test, feature = "test-support"))]
336 pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
337 self.context_server_factory = Some(factory);
338 }
339
340 #[cfg(any(test, feature = "test-support"))]
341 pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
342 &self.registry
343 }
344
345 #[cfg(any(test, feature = "test-support"))]
346 pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
347 let configuration = Arc::new(ContextServerConfiguration::Custom {
348 command: ContextServerCommand {
349 path: "test".into(),
350 args: vec![],
351 env: None,
352 timeout: None,
353 },
354 remote: false,
355 });
356 self.run_server(server, configuration, cx);
357 }
358
359 fn new_internal(
360 maintain_server_loop: bool,
361 context_server_factory: Option<ContextServerFactory>,
362 registry: Entity<ContextServerDescriptorRegistry>,
363 worktree_store: Entity<WorktreeStore>,
364 weak_project: Option<WeakEntity<Project>>,
365 state: ContextServerStoreState,
366 cx: &mut Context<Self>,
367 ) -> Self {
368 let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
369 let settings =
370 &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
371 if &this.context_server_settings == settings {
372 return;
373 }
374 this.context_server_settings = settings.clone();
375 if maintain_server_loop {
376 this.available_context_servers_changed(cx);
377 }
378 })];
379
380 if maintain_server_loop {
381 subscriptions.push(cx.observe(®istry, |this, _registry, cx| {
382 this.available_context_servers_changed(cx);
383 }));
384 }
385
386 let mut this = Self {
387 state,
388 _subscriptions: subscriptions,
389 context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
390 .context_servers
391 .clone(),
392 worktree_store,
393 project: weak_project,
394 registry,
395 needs_server_update: false,
396 servers: HashMap::default(),
397 update_servers_task: None,
398 context_server_factory,
399 };
400 if maintain_server_loop {
401 this.available_context_servers_changed(cx);
402 }
403 this
404 }
405
406 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
407 self.servers.get(id).map(|state| state.server())
408 }
409
410 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
411 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
412 Some(server.clone())
413 } else {
414 None
415 }
416 }
417
418 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
419 self.servers.get(id).map(ContextServerStatus::from_state)
420 }
421
422 pub fn configuration_for_server(
423 &self,
424 id: &ContextServerId,
425 ) -> Option<Arc<ContextServerConfiguration>> {
426 self.servers.get(id).map(|state| state.configuration())
427 }
428
429 pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
430 self.servers
431 .keys()
432 .cloned()
433 .chain(
434 self.registry
435 .read(cx)
436 .context_server_descriptors()
437 .into_iter()
438 .map(|(id, _)| ContextServerId(id)),
439 )
440 .collect()
441 }
442
443 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
444 self.servers
445 .values()
446 .filter_map(|state| {
447 if let ContextServerState::Running { server, .. } = state {
448 Some(server.clone())
449 } else {
450 None
451 }
452 })
453 .collect()
454 }
455
456 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
457 cx.spawn(async move |this, cx| {
458 let this = this.upgrade().context("Context server store dropped")?;
459 let settings = this
460 .update(cx, |this, _| {
461 this.context_server_settings.get(&server.id().0).cloned()
462 })
463 .context("Failed to get context server settings")?;
464
465 if !settings.enabled() {
466 return anyhow::Ok(());
467 }
468
469 let (registry, worktree_store) = this.update(cx, |this, _| {
470 (this.registry.clone(), this.worktree_store.clone())
471 });
472 let configuration = ContextServerConfiguration::from_settings(
473 settings,
474 server.id(),
475 registry,
476 worktree_store,
477 cx,
478 )
479 .await
480 .context("Failed to create context server configuration")?;
481
482 this.update(cx, |this, cx| {
483 this.run_server(server, Arc::new(configuration), cx)
484 });
485 Ok(())
486 })
487 .detach_and_log_err(cx);
488 }
489
490 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
491 if matches!(
492 self.servers.get(id),
493 Some(ContextServerState::Stopped { .. })
494 ) {
495 return Ok(());
496 }
497
498 let state = self
499 .servers
500 .remove(id)
501 .context("Context server not found")?;
502
503 let server = state.server();
504 let configuration = state.configuration();
505 let mut result = Ok(());
506 if let ContextServerState::Running { server, .. } = &state {
507 result = server.stop();
508 }
509 drop(state);
510
511 self.update_server_state(
512 id.clone(),
513 ContextServerState::Stopped {
514 configuration,
515 server,
516 },
517 cx,
518 );
519
520 result
521 }
522
523 fn run_server(
524 &mut self,
525 server: Arc<ContextServer>,
526 configuration: Arc<ContextServerConfiguration>,
527 cx: &mut Context<Self>,
528 ) {
529 let id = server.id();
530 if matches!(
531 self.servers.get(&id),
532 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
533 ) {
534 self.stop_server(&id, cx).log_err();
535 }
536 let task = cx.spawn({
537 let id = server.id();
538 let server = server.clone();
539 let configuration = configuration.clone();
540
541 async move |this, cx| {
542 match server.clone().start(cx).await {
543 Ok(_) => {
544 debug_assert!(server.client().is_some());
545
546 this.update(cx, |this, cx| {
547 this.update_server_state(
548 id.clone(),
549 ContextServerState::Running {
550 server,
551 configuration,
552 },
553 cx,
554 )
555 })
556 .log_err()
557 }
558 Err(err) => {
559 log::error!("{} context server failed to start: {}", id, err);
560 this.update(cx, |this, cx| {
561 this.update_server_state(
562 id.clone(),
563 ContextServerState::Error {
564 configuration,
565 server,
566 error: err.to_string().into(),
567 },
568 cx,
569 )
570 })
571 .log_err()
572 }
573 };
574 }
575 });
576
577 self.update_server_state(
578 id.clone(),
579 ContextServerState::Starting {
580 configuration,
581 _task: task,
582 server,
583 },
584 cx,
585 );
586 }
587
588 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
589 let state = self
590 .servers
591 .remove(id)
592 .context("Context server not found")?;
593 drop(state);
594 cx.emit(Event::ServerStatusChanged {
595 server_id: id.clone(),
596 status: ContextServerStatus::Stopped,
597 });
598 Ok(())
599 }
600
601 async fn create_context_server(
602 this: WeakEntity<Self>,
603 id: ContextServerId,
604 configuration: Arc<ContextServerConfiguration>,
605 cx: &mut AsyncApp,
606 ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
607 let remote = configuration.remote();
608 let needs_remote_command = match configuration.as_ref() {
609 ContextServerConfiguration::Custom { .. }
610 | ContextServerConfiguration::Extension { .. } => remote,
611 ContextServerConfiguration::Http { .. } => false,
612 };
613
614 let (remote_state, is_remote_project) = this.update(cx, |this, _| {
615 let remote_state = match &this.state {
616 ContextServerStoreState::Remote {
617 project_id,
618 upstream_client,
619 } if needs_remote_command => Some((*project_id, upstream_client.clone())),
620 _ => None,
621 };
622 (remote_state, this.is_remote_project())
623 })?;
624
625 let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
626 this.project
627 .as_ref()
628 .and_then(|project| {
629 project
630 .read_with(cx, |project, cx| project.active_project_directory(cx))
631 .ok()
632 .flatten()
633 })
634 .or_else(|| {
635 this.worktree_store.read_with(cx, |store, cx| {
636 store.visible_worktrees(cx).fold(None, |acc, item| {
637 if acc.is_none() {
638 item.read(cx).root_dir()
639 } else {
640 acc
641 }
642 })
643 })
644 })
645 })?;
646
647 let configuration = if let Some((project_id, upstream_client)) = remote_state {
648 let root_dir = root_path.as_ref().map(|p| p.display().to_string());
649
650 let response = upstream_client
651 .update(cx, |client, _| {
652 client
653 .proto_client()
654 .request(proto::GetContextServerCommand {
655 project_id,
656 server_id: id.0.to_string(),
657 root_dir: root_dir.clone(),
658 })
659 })
660 .await?;
661
662 let remote_command = upstream_client.update(cx, |client, _| {
663 client.build_command(
664 Some(response.path),
665 &response.args,
666 &response.env.into_iter().collect(),
667 root_dir,
668 None,
669 )
670 })?;
671
672 let command = ContextServerCommand {
673 path: remote_command.program.into(),
674 args: remote_command.args,
675 env: Some(remote_command.env.into_iter().collect()),
676 timeout: None,
677 };
678
679 Arc::new(ContextServerConfiguration::Custom { command, remote })
680 } else {
681 configuration
682 };
683
684 let server: Arc<ContextServer> = this.update(cx, |this, cx| {
685 let global_timeout =
686 Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
687
688 if let Some(factory) = this.context_server_factory.as_ref() {
689 return anyhow::Ok(factory(id.clone(), configuration.clone()));
690 }
691
692 match configuration.as_ref() {
693 ContextServerConfiguration::Http {
694 url,
695 headers,
696 timeout,
697 } => anyhow::Ok(Arc::new(ContextServer::http(
698 id,
699 url,
700 headers.clone(),
701 cx.http_client(),
702 cx.background_executor().clone(),
703 Some(Duration::from_secs(
704 timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
705 )),
706 )?)),
707 _ => {
708 let mut command = configuration
709 .command()
710 .context("Missing command configuration for stdio context server")?
711 .clone();
712 command.timeout = Some(
713 command
714 .timeout
715 .unwrap_or(global_timeout)
716 .min(MAX_TIMEOUT_SECS),
717 );
718
719 // Don't pass remote paths as working directory for locally-spawned processes
720 let working_directory = if is_remote_project { None } else { root_path };
721 anyhow::Ok(Arc::new(ContextServer::stdio(
722 id,
723 command,
724 working_directory,
725 )))
726 }
727 }
728 })??;
729
730 Ok((server, configuration))
731 }
732
733 async fn handle_get_context_server_command(
734 this: Entity<Self>,
735 envelope: TypedEnvelope<proto::GetContextServerCommand>,
736 mut cx: AsyncApp,
737 ) -> Result<proto::ContextServerCommand> {
738 let server_id = ContextServerId(envelope.payload.server_id.into());
739
740 let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
741 let ContextServerStoreState::Local {
742 is_headless: true, ..
743 } = &this.state
744 else {
745 anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
746 };
747
748 let settings = this
749 .context_server_settings
750 .get(&server_id.0)
751 .cloned()
752 .or_else(|| {
753 this.registry
754 .read(inner_cx)
755 .context_server_descriptor(&server_id.0)
756 .map(|_| ContextServerSettings::default_extension())
757 })
758 .with_context(|| format!("context server `{}` not found", server_id))?;
759
760 anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
761 })?;
762
763 let configuration = ContextServerConfiguration::from_settings(
764 settings,
765 server_id.clone(),
766 registry,
767 worktree_store,
768 &cx,
769 )
770 .await
771 .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
772
773 let command = configuration
774 .command()
775 .context("context server has no command (HTTP servers don't need RPC)")?;
776
777 Ok(proto::ContextServerCommand {
778 path: command.path.display().to_string(),
779 args: command.args.clone(),
780 env: command
781 .env
782 .clone()
783 .map(|env| env.into_iter().collect())
784 .unwrap_or_default(),
785 })
786 }
787
788 fn resolve_project_settings<'a>(
789 worktree_store: &'a Entity<WorktreeStore>,
790 cx: &'a App,
791 ) -> &'a ProjectSettings {
792 let location = worktree_store
793 .read(cx)
794 .visible_worktrees(cx)
795 .next()
796 .map(|worktree| settings::SettingsLocation {
797 worktree_id: worktree.read(cx).id(),
798 path: RelPath::empty(),
799 });
800 ProjectSettings::get(location, cx)
801 }
802
803 fn update_server_state(
804 &mut self,
805 id: ContextServerId,
806 state: ContextServerState,
807 cx: &mut Context<Self>,
808 ) {
809 let status = ContextServerStatus::from_state(&state);
810 self.servers.insert(id.clone(), state);
811 cx.emit(Event::ServerStatusChanged {
812 server_id: id,
813 status,
814 });
815 }
816
817 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
818 if self.update_servers_task.is_some() {
819 self.needs_server_update = true;
820 } else {
821 self.needs_server_update = false;
822 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
823 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
824 log::error!("Error maintaining context servers: {}", err);
825 }
826
827 this.update(cx, |this, cx| {
828 this.update_servers_task.take();
829 if this.needs_server_update {
830 this.available_context_servers_changed(cx);
831 }
832 })?;
833
834 Ok(())
835 }));
836 }
837 }
838
839 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
840 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
841 (
842 this.context_server_settings.clone(),
843 this.registry.clone(),
844 this.worktree_store.clone(),
845 )
846 })?;
847
848 for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
849 configured_servers
850 .entry(id)
851 .or_insert(ContextServerSettings::default_extension());
852 }
853
854 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
855 configured_servers
856 .into_iter()
857 .partition(|(_, settings)| settings.enabled());
858
859 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
860 let id = ContextServerId(id);
861 ContextServerConfiguration::from_settings(
862 settings,
863 id.clone(),
864 registry.clone(),
865 worktree_store.clone(),
866 cx,
867 )
868 .map(move |config| (id, config))
869 }))
870 .await
871 .into_iter()
872 .filter_map(|(id, config)| config.map(|config| (id, config)))
873 .collect::<HashMap<_, _>>();
874
875 let mut servers_to_start = Vec::new();
876 let mut servers_to_remove = HashSet::default();
877 let mut servers_to_stop = HashSet::default();
878
879 this.update(cx, |this, _cx| {
880 for server_id in this.servers.keys() {
881 // All servers that are not in desired_servers should be removed from the store.
882 // This can happen if the user removed a server from the context server settings.
883 if !configured_servers.contains_key(server_id) {
884 if disabled_servers.contains_key(&server_id.0) {
885 servers_to_stop.insert(server_id.clone());
886 } else {
887 servers_to_remove.insert(server_id.clone());
888 }
889 }
890 }
891
892 for (id, config) in configured_servers {
893 let state = this.servers.get(&id);
894 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
895 let existing_config = state.as_ref().map(|state| state.configuration());
896 if existing_config.as_deref() != Some(&config) || is_stopped {
897 let config = Arc::new(config);
898 servers_to_start.push((id.clone(), config));
899 if this.servers.contains_key(&id) {
900 servers_to_stop.insert(id);
901 }
902 }
903 }
904
905 anyhow::Ok(())
906 })??;
907
908 this.update(cx, |this, inner_cx| {
909 for id in servers_to_stop {
910 this.stop_server(&id, inner_cx)?;
911 }
912 for id in servers_to_remove {
913 this.remove_server(&id, inner_cx)?;
914 }
915 anyhow::Ok(())
916 })??;
917
918 for (id, config) in servers_to_start {
919 let (server, config) =
920 Self::create_context_server(this.clone(), id, config, cx).await?;
921 this.update(cx, |this, cx| {
922 this.run_server(server, config, cx);
923 })?;
924 }
925
926 Ok(())
927 }
928}
929
930#[cfg(test)]
931mod tests {
932 use super::*;
933 use crate::{
934 FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
935 project_settings::ProjectSettings,
936 };
937 use context_server::test::create_fake_transport;
938 use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
939 use http_client::{FakeHttpClient, Response};
940 use serde_json::json;
941 use std::{cell::RefCell, path::PathBuf, rc::Rc};
942 use util::path;
943
944 #[gpui::test]
945 async fn test_context_server_status(cx: &mut TestAppContext) {
946 const SERVER_1_ID: &str = "mcp-1";
947 const SERVER_2_ID: &str = "mcp-2";
948
949 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
950
951 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
952 let store = cx.new(|cx| {
953 ContextServerStore::test(
954 registry.clone(),
955 project.read(cx).worktree_store(),
956 Some(project.downgrade()),
957 cx,
958 )
959 });
960
961 let server_1_id = ContextServerId(SERVER_1_ID.into());
962 let server_2_id = ContextServerId(SERVER_2_ID.into());
963
964 let server_1 = Arc::new(ContextServer::new(
965 server_1_id.clone(),
966 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
967 ));
968 let server_2 = Arc::new(ContextServer::new(
969 server_2_id.clone(),
970 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
971 ));
972
973 store.update(cx, |store, cx| store.test_start_server(server_1, cx));
974
975 cx.run_until_parked();
976
977 cx.update(|cx| {
978 assert_eq!(
979 store.read(cx).status_for_server(&server_1_id),
980 Some(ContextServerStatus::Running)
981 );
982 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
983 });
984
985 store.update(cx, |store, cx| {
986 store.test_start_server(server_2.clone(), cx)
987 });
988
989 cx.run_until_parked();
990
991 cx.update(|cx| {
992 assert_eq!(
993 store.read(cx).status_for_server(&server_1_id),
994 Some(ContextServerStatus::Running)
995 );
996 assert_eq!(
997 store.read(cx).status_for_server(&server_2_id),
998 Some(ContextServerStatus::Running)
999 );
1000 });
1001
1002 store
1003 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
1004 .unwrap();
1005
1006 cx.update(|cx| {
1007 assert_eq!(
1008 store.read(cx).status_for_server(&server_1_id),
1009 Some(ContextServerStatus::Running)
1010 );
1011 assert_eq!(
1012 store.read(cx).status_for_server(&server_2_id),
1013 Some(ContextServerStatus::Stopped)
1014 );
1015 });
1016 }
1017
1018 #[gpui::test]
1019 async fn test_context_server_status_events(cx: &mut TestAppContext) {
1020 const SERVER_1_ID: &str = "mcp-1";
1021 const SERVER_2_ID: &str = "mcp-2";
1022
1023 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1024
1025 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1026 let store = cx.new(|cx| {
1027 ContextServerStore::test(
1028 registry.clone(),
1029 project.read(cx).worktree_store(),
1030 Some(project.downgrade()),
1031 cx,
1032 )
1033 });
1034
1035 let server_1_id = ContextServerId(SERVER_1_ID.into());
1036 let server_2_id = ContextServerId(SERVER_2_ID.into());
1037
1038 let server_1 = Arc::new(ContextServer::new(
1039 server_1_id.clone(),
1040 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
1041 ));
1042 let server_2 = Arc::new(ContextServer::new(
1043 server_2_id.clone(),
1044 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
1045 ));
1046
1047 let _server_events = assert_server_events(
1048 &store,
1049 vec![
1050 (server_1_id.clone(), ContextServerStatus::Starting),
1051 (server_1_id, ContextServerStatus::Running),
1052 (server_2_id.clone(), ContextServerStatus::Starting),
1053 (server_2_id.clone(), ContextServerStatus::Running),
1054 (server_2_id.clone(), ContextServerStatus::Stopped),
1055 ],
1056 cx,
1057 );
1058
1059 store.update(cx, |store, cx| store.test_start_server(server_1, cx));
1060
1061 cx.run_until_parked();
1062
1063 store.update(cx, |store, cx| {
1064 store.test_start_server(server_2.clone(), cx)
1065 });
1066
1067 cx.run_until_parked();
1068
1069 store
1070 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
1071 .unwrap();
1072 }
1073
1074 #[gpui::test(iterations = 25)]
1075 async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
1076 const SERVER_1_ID: &str = "mcp-1";
1077
1078 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1079
1080 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1081 let store = cx.new(|cx| {
1082 ContextServerStore::test(
1083 registry.clone(),
1084 project.read(cx).worktree_store(),
1085 Some(project.downgrade()),
1086 cx,
1087 )
1088 });
1089
1090 let server_id = ContextServerId(SERVER_1_ID.into());
1091
1092 let server_with_same_id_1 = Arc::new(ContextServer::new(
1093 server_id.clone(),
1094 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
1095 ));
1096 let server_with_same_id_2 = Arc::new(ContextServer::new(
1097 server_id.clone(),
1098 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
1099 ));
1100
1101 // If we start another server with the same id, we should report that we stopped the previous one
1102 let _server_events = assert_server_events(
1103 &store,
1104 vec![
1105 (server_id.clone(), ContextServerStatus::Starting),
1106 (server_id.clone(), ContextServerStatus::Stopped),
1107 (server_id.clone(), ContextServerStatus::Starting),
1108 (server_id.clone(), ContextServerStatus::Running),
1109 ],
1110 cx,
1111 );
1112
1113 store.update(cx, |store, cx| {
1114 store.test_start_server(server_with_same_id_1.clone(), cx)
1115 });
1116 store.update(cx, |store, cx| {
1117 store.test_start_server(server_with_same_id_2.clone(), cx)
1118 });
1119
1120 cx.run_until_parked();
1121
1122 cx.update(|cx| {
1123 assert_eq!(
1124 store.read(cx).status_for_server(&server_id),
1125 Some(ContextServerStatus::Running)
1126 );
1127 });
1128 }
1129
1130 #[gpui::test]
1131 async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
1132 const SERVER_1_ID: &str = "mcp-1";
1133 const SERVER_2_ID: &str = "mcp-2";
1134
1135 let server_1_id = ContextServerId(SERVER_1_ID.into());
1136 let server_2_id = ContextServerId(SERVER_2_ID.into());
1137
1138 let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
1139
1140 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1141
1142 let executor = cx.executor();
1143 let store = project.read_with(cx, |project, _| project.context_server_store());
1144 store.update(cx, |store, cx| {
1145 store.set_context_server_factory(Box::new(move |id, _| {
1146 Arc::new(ContextServer::new(
1147 id.clone(),
1148 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1149 ))
1150 }));
1151 store.registry().update(cx, |registry, cx| {
1152 registry.register_context_server_descriptor(
1153 SERVER_1_ID.into(),
1154 fake_descriptor_1,
1155 cx,
1156 );
1157 });
1158 });
1159
1160 set_context_server_configuration(
1161 vec![(
1162 server_1_id.0.clone(),
1163 settings::ContextServerSettingsContent::Extension {
1164 enabled: true,
1165 remote: false,
1166 settings: json!({
1167 "somevalue": true
1168 }),
1169 },
1170 )],
1171 cx,
1172 );
1173
1174 // Ensure that mcp-1 starts up
1175 {
1176 let _server_events = assert_server_events(
1177 &store,
1178 vec![
1179 (server_1_id.clone(), ContextServerStatus::Starting),
1180 (server_1_id.clone(), ContextServerStatus::Running),
1181 ],
1182 cx,
1183 );
1184 cx.run_until_parked();
1185 }
1186
1187 // Ensure that mcp-1 is restarted when the configuration was changed
1188 {
1189 let _server_events = assert_server_events(
1190 &store,
1191 vec![
1192 (server_1_id.clone(), ContextServerStatus::Stopped),
1193 (server_1_id.clone(), ContextServerStatus::Starting),
1194 (server_1_id.clone(), ContextServerStatus::Running),
1195 ],
1196 cx,
1197 );
1198 set_context_server_configuration(
1199 vec![(
1200 server_1_id.0.clone(),
1201 settings::ContextServerSettingsContent::Extension {
1202 enabled: true,
1203 remote: false,
1204 settings: json!({
1205 "somevalue": false
1206 }),
1207 },
1208 )],
1209 cx,
1210 );
1211
1212 cx.run_until_parked();
1213 }
1214
1215 // Ensure that mcp-1 is not restarted when the configuration was not changed
1216 {
1217 let _server_events = assert_server_events(&store, vec![], cx);
1218 set_context_server_configuration(
1219 vec![(
1220 server_1_id.0.clone(),
1221 settings::ContextServerSettingsContent::Extension {
1222 enabled: true,
1223 remote: false,
1224 settings: json!({
1225 "somevalue": false
1226 }),
1227 },
1228 )],
1229 cx,
1230 );
1231
1232 cx.run_until_parked();
1233 }
1234
1235 // Ensure that mcp-2 is started once it is added to the settings
1236 {
1237 let _server_events = assert_server_events(
1238 &store,
1239 vec![
1240 (server_2_id.clone(), ContextServerStatus::Starting),
1241 (server_2_id.clone(), ContextServerStatus::Running),
1242 ],
1243 cx,
1244 );
1245 set_context_server_configuration(
1246 vec![
1247 (
1248 server_1_id.0.clone(),
1249 settings::ContextServerSettingsContent::Extension {
1250 enabled: true,
1251 remote: false,
1252 settings: json!({
1253 "somevalue": false
1254 }),
1255 },
1256 ),
1257 (
1258 server_2_id.0.clone(),
1259 settings::ContextServerSettingsContent::Stdio {
1260 enabled: true,
1261 remote: false,
1262 command: ContextServerCommand {
1263 path: "somebinary".into(),
1264 args: vec!["arg".to_string()],
1265 env: None,
1266 timeout: None,
1267 },
1268 },
1269 ),
1270 ],
1271 cx,
1272 );
1273
1274 cx.run_until_parked();
1275 }
1276
1277 // Ensure that mcp-2 is restarted once the args have changed
1278 {
1279 let _server_events = assert_server_events(
1280 &store,
1281 vec![
1282 (server_2_id.clone(), ContextServerStatus::Stopped),
1283 (server_2_id.clone(), ContextServerStatus::Starting),
1284 (server_2_id.clone(), ContextServerStatus::Running),
1285 ],
1286 cx,
1287 );
1288 set_context_server_configuration(
1289 vec![
1290 (
1291 server_1_id.0.clone(),
1292 settings::ContextServerSettingsContent::Extension {
1293 enabled: true,
1294 remote: false,
1295 settings: json!({
1296 "somevalue": false
1297 }),
1298 },
1299 ),
1300 (
1301 server_2_id.0.clone(),
1302 settings::ContextServerSettingsContent::Stdio {
1303 enabled: true,
1304 remote: false,
1305 command: ContextServerCommand {
1306 path: "somebinary".into(),
1307 args: vec!["anotherArg".to_string()],
1308 env: None,
1309 timeout: None,
1310 },
1311 },
1312 ),
1313 ],
1314 cx,
1315 );
1316
1317 cx.run_until_parked();
1318 }
1319
1320 // Ensure that mcp-2 is removed once it is removed from the settings
1321 {
1322 let _server_events = assert_server_events(
1323 &store,
1324 vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1325 cx,
1326 );
1327 set_context_server_configuration(
1328 vec![(
1329 server_1_id.0.clone(),
1330 settings::ContextServerSettingsContent::Extension {
1331 enabled: true,
1332 remote: false,
1333 settings: json!({
1334 "somevalue": false
1335 }),
1336 },
1337 )],
1338 cx,
1339 );
1340
1341 cx.run_until_parked();
1342
1343 cx.update(|cx| {
1344 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1345 });
1346 }
1347
1348 // Ensure that nothing happens if the settings do not change
1349 {
1350 let _server_events = assert_server_events(&store, vec![], cx);
1351 set_context_server_configuration(
1352 vec![(
1353 server_1_id.0.clone(),
1354 settings::ContextServerSettingsContent::Extension {
1355 enabled: true,
1356 remote: false,
1357 settings: json!({
1358 "somevalue": false
1359 }),
1360 },
1361 )],
1362 cx,
1363 );
1364
1365 cx.run_until_parked();
1366
1367 cx.update(|cx| {
1368 assert_eq!(
1369 store.read(cx).status_for_server(&server_1_id),
1370 Some(ContextServerStatus::Running)
1371 );
1372 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1373 });
1374 }
1375 }
1376
1377 #[gpui::test]
1378 async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1379 const SERVER_1_ID: &str = "mcp-1";
1380
1381 let server_1_id = ContextServerId(SERVER_1_ID.into());
1382
1383 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1384
1385 let executor = cx.executor();
1386 let store = project.read_with(cx, |project, _| project.context_server_store());
1387 store.update(cx, |store, _| {
1388 store.set_context_server_factory(Box::new(move |id, _| {
1389 Arc::new(ContextServer::new(
1390 id.clone(),
1391 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1392 ))
1393 }));
1394 });
1395
1396 set_context_server_configuration(
1397 vec![(
1398 server_1_id.0.clone(),
1399 settings::ContextServerSettingsContent::Stdio {
1400 enabled: true,
1401 remote: false,
1402 command: ContextServerCommand {
1403 path: "somebinary".into(),
1404 args: vec!["arg".to_string()],
1405 env: None,
1406 timeout: None,
1407 },
1408 },
1409 )],
1410 cx,
1411 );
1412
1413 // Ensure that mcp-1 starts up
1414 {
1415 let _server_events = assert_server_events(
1416 &store,
1417 vec![
1418 (server_1_id.clone(), ContextServerStatus::Starting),
1419 (server_1_id.clone(), ContextServerStatus::Running),
1420 ],
1421 cx,
1422 );
1423 cx.run_until_parked();
1424 }
1425
1426 // Ensure that mcp-1 is stopped once it is disabled.
1427 {
1428 let _server_events = assert_server_events(
1429 &store,
1430 vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1431 cx,
1432 );
1433 set_context_server_configuration(
1434 vec![(
1435 server_1_id.0.clone(),
1436 settings::ContextServerSettingsContent::Stdio {
1437 enabled: false,
1438 remote: false,
1439 command: ContextServerCommand {
1440 path: "somebinary".into(),
1441 args: vec!["arg".to_string()],
1442 env: None,
1443 timeout: None,
1444 },
1445 },
1446 )],
1447 cx,
1448 );
1449
1450 cx.run_until_parked();
1451 }
1452
1453 // Ensure that mcp-1 is started once it is enabled again.
1454 {
1455 let _server_events = assert_server_events(
1456 &store,
1457 vec![
1458 (server_1_id.clone(), ContextServerStatus::Starting),
1459 (server_1_id.clone(), ContextServerStatus::Running),
1460 ],
1461 cx,
1462 );
1463 set_context_server_configuration(
1464 vec![(
1465 server_1_id.0.clone(),
1466 settings::ContextServerSettingsContent::Stdio {
1467 enabled: true,
1468 remote: false,
1469 command: ContextServerCommand {
1470 path: "somebinary".into(),
1471 args: vec!["arg".to_string()],
1472 timeout: None,
1473 env: None,
1474 },
1475 },
1476 )],
1477 cx,
1478 );
1479
1480 cx.run_until_parked();
1481 }
1482 }
1483
1484 fn set_context_server_configuration(
1485 context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1486 cx: &mut TestAppContext,
1487 ) {
1488 cx.update(|cx| {
1489 SettingsStore::update_global(cx, |store, cx| {
1490 store.update_user_settings(cx, |content| {
1491 content.project.context_servers.clear();
1492 for (id, config) in context_servers {
1493 content.project.context_servers.insert(id, config);
1494 }
1495 });
1496 })
1497 });
1498 }
1499
1500 #[gpui::test]
1501 async fn test_remote_context_server(cx: &mut TestAppContext) {
1502 const SERVER_ID: &str = "remote-server";
1503 let server_id = ContextServerId(SERVER_ID.into());
1504 let server_url = "http://example.com/api";
1505
1506 let client = FakeHttpClient::create(|_| async move {
1507 use http_client::AsyncBody;
1508
1509 let response = Response::builder()
1510 .status(200)
1511 .header("Content-Type", "application/json")
1512 .body(AsyncBody::from(
1513 serde_json::to_string(&json!({
1514 "jsonrpc": "2.0",
1515 "id": 0,
1516 "result": {
1517 "protocolVersion": "2024-11-05",
1518 "capabilities": {},
1519 "serverInfo": {
1520 "name": "test-server",
1521 "version": "1.0.0"
1522 }
1523 }
1524 }))
1525 .unwrap(),
1526 ))
1527 .unwrap();
1528 Ok(response)
1529 });
1530 cx.update(|cx| cx.set_http_client(client));
1531
1532 let (_fs, project) = setup_context_server_test(cx, json!({ "code.rs": "" }), vec![]).await;
1533
1534 let store = project.read_with(cx, |project, _| project.context_server_store());
1535
1536 set_context_server_configuration(
1537 vec![(
1538 server_id.0.clone(),
1539 settings::ContextServerSettingsContent::Http {
1540 enabled: true,
1541 url: server_url.to_string(),
1542 headers: Default::default(),
1543 timeout: None,
1544 },
1545 )],
1546 cx,
1547 );
1548
1549 let _server_events = assert_server_events(
1550 &store,
1551 vec![
1552 (server_id.clone(), ContextServerStatus::Starting),
1553 (server_id.clone(), ContextServerStatus::Running),
1554 ],
1555 cx,
1556 );
1557 cx.run_until_parked();
1558 }
1559
1560 struct ServerEvents {
1561 received_event_count: Rc<RefCell<usize>>,
1562 expected_event_count: usize,
1563 _subscription: Subscription,
1564 }
1565
1566 impl Drop for ServerEvents {
1567 fn drop(&mut self) {
1568 let actual_event_count = *self.received_event_count.borrow();
1569 assert_eq!(
1570 actual_event_count, self.expected_event_count,
1571 "
1572 Expected to receive {} context server store events, but received {} events",
1573 self.expected_event_count, actual_event_count
1574 );
1575 }
1576 }
1577
1578 #[gpui::test]
1579 async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1580 cx.update(|cx| {
1581 let settings_store = SettingsStore::test(cx);
1582 cx.set_global(settings_store);
1583 SettingsStore::update_global(cx, |store, cx| {
1584 store
1585 .set_user_settings(r#"{"context_server_timeout": 90}"#, cx)
1586 .expect("Failed to set test user settings");
1587 });
1588 });
1589
1590 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1591
1592 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1593 let store = cx.new(|cx| {
1594 ContextServerStore::test(
1595 registry.clone(),
1596 project.read(cx).worktree_store(),
1597 Some(project.downgrade()),
1598 cx,
1599 )
1600 });
1601
1602 let mut async_cx = cx.to_async();
1603 let result = ContextServerStore::create_context_server(
1604 store.downgrade(),
1605 ContextServerId("test-server".into()),
1606 Arc::new(ContextServerConfiguration::Http {
1607 url: url::Url::parse("http://localhost:8080").expect("Failed to parse test URL"),
1608 headers: Default::default(),
1609 timeout: None,
1610 }),
1611 &mut async_cx,
1612 )
1613 .await;
1614
1615 assert!(
1616 result.is_ok(),
1617 "Server should be created successfully with global timeout"
1618 );
1619 }
1620
1621 #[gpui::test]
1622 async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1623 const SERVER_ID: &str = "test-server";
1624
1625 cx.update(|cx| {
1626 let settings_store = SettingsStore::test(cx);
1627 cx.set_global(settings_store);
1628 SettingsStore::update_global(cx, |store, cx| {
1629 store
1630 .set_user_settings(r#"{"context_server_timeout": 60}"#, cx)
1631 .expect("Failed to set test user settings");
1632 });
1633 });
1634
1635 let (_fs, project) = setup_context_server_test(
1636 cx,
1637 json!({"code.rs": ""}),
1638 vec![(
1639 SERVER_ID.into(),
1640 ContextServerSettings::Http {
1641 enabled: true,
1642 url: "http://localhost:8080".to_string(),
1643 headers: Default::default(),
1644 timeout: Some(120),
1645 },
1646 )],
1647 )
1648 .await;
1649
1650 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1651 let store = cx.new(|cx| {
1652 ContextServerStore::test(
1653 registry.clone(),
1654 project.read(cx).worktree_store(),
1655 Some(project.downgrade()),
1656 cx,
1657 )
1658 });
1659
1660 let mut async_cx = cx.to_async();
1661 let result = ContextServerStore::create_context_server(
1662 store.downgrade(),
1663 ContextServerId("test-server".into()),
1664 Arc::new(ContextServerConfiguration::Http {
1665 url: url::Url::parse("http://localhost:8080").expect("Failed to parse test URL"),
1666 headers: Default::default(),
1667 timeout: Some(120),
1668 }),
1669 &mut async_cx,
1670 )
1671 .await;
1672
1673 assert!(
1674 result.is_ok(),
1675 "Server should be created successfully with per-server timeout override"
1676 );
1677 }
1678
1679 #[gpui::test]
1680 async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1681 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1682
1683 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1684 let store = cx.new(|cx| {
1685 ContextServerStore::test(
1686 registry.clone(),
1687 project.read(cx).worktree_store(),
1688 Some(project.downgrade()),
1689 cx,
1690 )
1691 });
1692
1693 let mut async_cx = cx.to_async();
1694 let result = ContextServerStore::create_context_server(
1695 store.downgrade(),
1696 ContextServerId("stdio-server".into()),
1697 Arc::new(ContextServerConfiguration::Custom {
1698 command: ContextServerCommand {
1699 path: "/usr/bin/node".into(),
1700 args: vec!["server.js".into()],
1701 env: None,
1702 timeout: Some(180000),
1703 },
1704 remote: false,
1705 }),
1706 &mut async_cx,
1707 )
1708 .await;
1709
1710 assert!(
1711 result.is_ok(),
1712 "Stdio server should be created successfully with timeout"
1713 );
1714 }
1715
1716 fn assert_server_events(
1717 store: &Entity<ContextServerStore>,
1718 expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1719 cx: &mut TestAppContext,
1720 ) -> ServerEvents {
1721 cx.update(|cx| {
1722 let mut ix = 0;
1723 let received_event_count = Rc::new(RefCell::new(0));
1724 let expected_event_count = expected_events.len();
1725 let subscription = cx.subscribe(store, {
1726 let received_event_count = received_event_count.clone();
1727 move |_, event, _| match event {
1728 Event::ServerStatusChanged {
1729 server_id: actual_server_id,
1730 status: actual_status,
1731 } => {
1732 let (expected_server_id, expected_status) = &expected_events[ix];
1733
1734 assert_eq!(
1735 actual_server_id, expected_server_id,
1736 "Expected different server id at index {}",
1737 ix
1738 );
1739 assert_eq!(
1740 actual_status, expected_status,
1741 "Expected different status at index {}",
1742 ix
1743 );
1744 ix += 1;
1745 *received_event_count.borrow_mut() += 1;
1746 }
1747 }
1748 });
1749 ServerEvents {
1750 expected_event_count,
1751 received_event_count,
1752 _subscription: subscription,
1753 }
1754 })
1755 }
1756
1757 async fn setup_context_server_test(
1758 cx: &mut TestAppContext,
1759 files: serde_json::Value,
1760 context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1761 ) -> (Arc<FakeFs>, Entity<Project>) {
1762 cx.update(|cx| {
1763 let settings_store = SettingsStore::test(cx);
1764 cx.set_global(settings_store);
1765 let mut settings = ProjectSettings::get_global(cx).clone();
1766 for (id, config) in context_server_configurations {
1767 settings.context_servers.insert(id, config);
1768 }
1769 ProjectSettings::override_global(settings, cx);
1770 });
1771
1772 let fs = FakeFs::new(cx.executor());
1773 fs.insert_tree(path!("/test"), files).await;
1774 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1775
1776 (fs, project)
1777 }
1778
1779 struct FakeContextServerDescriptor {
1780 path: PathBuf,
1781 }
1782
1783 impl FakeContextServerDescriptor {
1784 fn new(path: impl Into<PathBuf>) -> Self {
1785 Self { path: path.into() }
1786 }
1787 }
1788
1789 impl ContextServerDescriptor for FakeContextServerDescriptor {
1790 fn command(
1791 &self,
1792 _worktree_store: Entity<WorktreeStore>,
1793 _cx: &AsyncApp,
1794 ) -> Task<Result<ContextServerCommand>> {
1795 Task::ready(Ok(ContextServerCommand {
1796 path: self.path.clone(),
1797 args: vec!["arg1".to_string(), "arg2".to_string()],
1798 env: None,
1799 timeout: None,
1800 }))
1801 }
1802
1803 fn configuration(
1804 &self,
1805 _worktree_store: Entity<WorktreeStore>,
1806 _cx: &AsyncApp,
1807 ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1808 Task::ready(Ok(None))
1809 }
1810 }
1811}