manager.rs

  1//! This module implements a context server management system for Zed.
  2//!
  3//! It provides functionality to:
  4//! - Define and load context server settings
  5//! - Manage individual context servers (start, stop, restart)
  6//! - Maintain a global manager for all context servers
  7//!
  8//! Key components:
  9//! - `ContextServerSettings`: Defines the structure for server configurations
 10//! - `ContextServer`: Represents an individual context server
 11//! - `ContextServerManager`: Manages multiple context servers
 12//! - `GlobalContextServerManager`: Provides global access to the ContextServerManager
 13//!
 14//! The module also includes initialization logic to set up the context server system
 15//! and react to changes in settings.
 16
 17use collections::{HashMap, HashSet};
 18use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, Task};
 19use log;
 20use parking_lot::RwLock;
 21use schemars::JsonSchema;
 22use serde::{Deserialize, Serialize};
 23use settings::{Settings, SettingsSources, SettingsStore};
 24use std::path::Path;
 25use std::sync::Arc;
 26
 27use crate::{
 28    client::{self, Client},
 29    types,
 30};
 31
 32#[derive(Deserialize, Serialize, Default, Clone, PartialEq, Eq, JsonSchema, Debug)]
 33pub struct ContextServerSettings {
 34    pub servers: Vec<ServerConfig>,
 35}
 36
 37#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, JsonSchema, Debug)]
 38pub struct ServerConfig {
 39    pub id: String,
 40    pub executable: String,
 41    pub args: Vec<String>,
 42    pub env: Option<HashMap<String, String>>,
 43}
 44
 45impl Settings for ContextServerSettings {
 46    const KEY: Option<&'static str> = Some("experimental.context_servers");
 47
 48    type FileContent = Self;
 49
 50    fn load(
 51        sources: SettingsSources<Self::FileContent>,
 52        _: &mut gpui::AppContext,
 53    ) -> anyhow::Result<Self> {
 54        sources.json_merge()
 55    }
 56}
 57
 58pub struct ContextServer {
 59    pub id: String,
 60    pub config: ServerConfig,
 61    pub client: RwLock<Option<Arc<crate::protocol::InitializedContextServerProtocol>>>,
 62}
 63
 64impl ContextServer {
 65    fn new(config: ServerConfig) -> Self {
 66        Self {
 67            id: config.id.clone(),
 68            config,
 69            client: RwLock::new(None),
 70        }
 71    }
 72
 73    async fn start(&self, cx: &AsyncAppContext) -> anyhow::Result<()> {
 74        log::info!("starting context server {}", self.config.id,);
 75        let client = Client::new(
 76            client::ContextServerId(self.config.id.clone()),
 77            client::ModelContextServerBinary {
 78                executable: Path::new(&self.config.executable).to_path_buf(),
 79                args: self.config.args.clone(),
 80                env: self.config.env.clone(),
 81            },
 82            cx.clone(),
 83        )?;
 84
 85        let protocol = crate::protocol::ModelContextProtocol::new(client);
 86        let client_info = types::EntityInfo {
 87            name: "Zed".to_string(),
 88            version: env!("CARGO_PKG_VERSION").to_string(),
 89        };
 90        let initialized_protocol = protocol.initialize(client_info).await?;
 91
 92        log::debug!(
 93            "context server {} initialized: {:?}",
 94            self.config.id,
 95            initialized_protocol.initialize,
 96        );
 97
 98        *self.client.write() = Some(Arc::new(initialized_protocol));
 99        Ok(())
100    }
101
102    async fn stop(&self) -> anyhow::Result<()> {
103        let mut client = self.client.write();
104        if let Some(protocol) = client.take() {
105            drop(protocol);
106        }
107        Ok(())
108    }
109}
110
111/// A Context server manager manages the starting and stopping
112/// of all servers. To obtain a server to interact with, a crate
113/// must go through the `GlobalContextServerManager` which holds
114/// a model to the ContextServerManager.
115pub struct ContextServerManager {
116    servers: HashMap<String, Arc<ContextServer>>,
117    pending_servers: HashSet<String>,
118}
119
120pub enum Event {
121    ServerStarted { server_id: String },
122    ServerStopped { server_id: String },
123}
124
125impl Global for ContextServerManager {}
126impl EventEmitter<Event> for ContextServerManager {}
127
128impl Default for ContextServerManager {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134impl ContextServerManager {
135    pub fn new() -> Self {
136        Self {
137            servers: HashMap::default(),
138            pending_servers: HashSet::default(),
139        }
140    }
141    pub fn global(cx: &AppContext) -> Model<Self> {
142        cx.global::<GlobalContextServerManager>().0.clone()
143    }
144
145    pub fn add_server(
146        &mut self,
147        config: ServerConfig,
148        cx: &mut ModelContext<Self>,
149    ) -> Task<anyhow::Result<()>> {
150        let server_id = config.id.clone();
151        let server_id2 = config.id.clone();
152
153        if self.servers.contains_key(&server_id) || self.pending_servers.contains(&server_id) {
154            return Task::ready(Ok(()));
155        }
156
157        let task = cx.spawn(|this, mut cx| async move {
158            let server = Arc::new(ContextServer::new(config));
159            server.start(&cx).await?;
160            this.update(&mut cx, |this, cx| {
161                this.servers.insert(server_id.clone(), server);
162                this.pending_servers.remove(&server_id);
163                cx.emit(Event::ServerStarted {
164                    server_id: server_id.clone(),
165                });
166            })?;
167            Ok(())
168        });
169
170        self.pending_servers.insert(server_id2);
171        task
172    }
173
174    pub fn get_server(&self, id: &str) -> Option<Arc<ContextServer>> {
175        self.servers.get(id).cloned()
176    }
177
178    pub fn remove_server(
179        &mut self,
180        id: &str,
181        cx: &mut ModelContext<Self>,
182    ) -> Task<anyhow::Result<()>> {
183        let id = id.to_string();
184        cx.spawn(|this, mut cx| async move {
185            if let Some(server) = this.update(&mut cx, |this, _cx| this.servers.remove(&id))? {
186                server.stop().await?;
187            }
188            this.update(&mut cx, |this, cx| {
189                this.pending_servers.remove(&id);
190                cx.emit(Event::ServerStopped {
191                    server_id: id.clone(),
192                })
193            })?;
194            Ok(())
195        })
196    }
197
198    pub fn restart_server(
199        &mut self,
200        id: &str,
201        cx: &mut ModelContext<Self>,
202    ) -> Task<anyhow::Result<()>> {
203        let id = id.to_string();
204        cx.spawn(|this, mut cx| async move {
205            if let Some(server) = this.update(&mut cx, |this, _cx| this.servers.remove(&id))? {
206                server.stop().await?;
207                let config = server.config.clone();
208                let new_server = Arc::new(ContextServer::new(config));
209                new_server.start(&cx).await?;
210                this.update(&mut cx, |this, cx| {
211                    this.servers.insert(id.clone(), new_server);
212                    cx.emit(Event::ServerStopped {
213                        server_id: id.clone(),
214                    });
215                    cx.emit(Event::ServerStarted {
216                        server_id: id.clone(),
217                    });
218                })?;
219            }
220            Ok(())
221        })
222    }
223
224    pub fn servers(&self) -> Vec<Arc<ContextServer>> {
225        self.servers.values().cloned().collect()
226    }
227
228    pub fn model(cx: &mut AppContext) -> Model<Self> {
229        cx.new_model(|_cx| ContextServerManager::new())
230    }
231}
232
233pub struct GlobalContextServerManager(Model<ContextServerManager>);
234impl Global for GlobalContextServerManager {}
235
236impl GlobalContextServerManager {
237    fn register(cx: &mut AppContext) {
238        let model = ContextServerManager::model(cx);
239        cx.set_global(Self(model));
240    }
241}
242
243pub fn init(cx: &mut AppContext) {
244    ContextServerSettings::register(cx);
245    GlobalContextServerManager::register(cx);
246    cx.observe_global::<SettingsStore>(|cx| {
247        let manager = ContextServerManager::global(cx);
248        cx.update_model(&manager, |manager, cx| {
249            let settings = ContextServerSettings::get_global(cx);
250            let current_servers: HashMap<String, ServerConfig> = manager
251                .servers()
252                .into_iter()
253                .map(|server| (server.id.clone(), server.config.clone()))
254                .collect();
255
256            let new_servers = settings
257                .servers
258                .iter()
259                .map(|config| (config.id.clone(), config.clone()))
260                .collect::<HashMap<_, _>>();
261
262            let servers_to_add = new_servers
263                .values()
264                .filter(|config| !current_servers.contains_key(&config.id))
265                .cloned()
266                .collect::<Vec<_>>();
267
268            let servers_to_remove = current_servers
269                .keys()
270                .filter(|id| !new_servers.contains_key(*id))
271                .cloned()
272                .collect::<Vec<_>>();
273
274            log::trace!("servers_to_add={:?}", servers_to_add);
275            for config in servers_to_add {
276                manager.add_server(config, cx).detach_and_log_err(cx);
277            }
278
279            for id in servers_to_remove {
280                manager.remove_server(&id, cx).detach_and_log_err(cx);
281            }
282        })
283    })
284    .detach();
285}