headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use extension::ExtensionStore;
  5use fs::Fs;
  6use futures::{Future, StreamExt};
  7use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  8use language::LanguageRegistry;
  9use node_runtime::NodeRuntime;
 10use postage::stream::Stream;
 11use project::Project;
 12use rpc::{proto, ErrorCode, TypedEnvelope};
 13use settings::{Settings, SettingsStore};
 14use std::path::Path;
 15use std::{collections::HashMap, sync::Arc};
 16use util::{ResultExt, TryFutureExt};
 17
 18pub struct DevServer {
 19    client: Arc<Client>,
 20    app_state: AppState,
 21    remote_shutdown: bool,
 22    projects: HashMap<DevServerProjectId, Model<Project>>,
 23    _subscriptions: Vec<client::Subscription>,
 24    _maintain_connection: Task<Option<()>>,
 25}
 26
 27pub struct AppState {
 28    pub node_runtime: Arc<dyn NodeRuntime>,
 29    pub user_store: Model<UserStore>,
 30    pub languages: Arc<LanguageRegistry>,
 31    pub fs: Arc<dyn Fs>,
 32}
 33
 34struct GlobalDevServer(Model<DevServer>);
 35
 36impl Global for GlobalDevServer {}
 37
 38pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 39    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 40    cx.set_global(GlobalDevServer(dev_server.clone()));
 41
 42    #[cfg(not(target_os = "windows"))]
 43    {
 44        use signal_hook::consts::{SIGINT, SIGTERM};
 45        use signal_hook::iterator::Signals;
 46        // Set up a handler when the dev server is shut down
 47        // with ctrl-c or kill
 48        let (tx, rx) = futures::channel::oneshot::channel();
 49        let mut signals = Signals::new([SIGTERM, SIGINT]).unwrap();
 50        std::thread::spawn({
 51            move || {
 52                if let Some(sig) = signals.forever().next() {
 53                    tx.send(sig).log_err();
 54                }
 55            }
 56        });
 57        cx.spawn(|cx| async move {
 58            if let Ok(sig) = rx.await {
 59                log::info!("received signal {sig:?}");
 60                cx.update(|cx| cx.quit()).log_err();
 61            }
 62        })
 63        .detach();
 64    }
 65
 66    let server_url = ClientSettings::get_global(cx).server_url.clone();
 67    cx.spawn(|cx| async move {
 68        client
 69            .authenticate_and_connect(false, &cx)
 70            .await
 71            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 72    })
 73}
 74
 75impl DevServer {
 76    pub fn global(cx: &AppContext) -> Model<DevServer> {
 77        cx.global::<GlobalDevServer>().0.clone()
 78    }
 79
 80    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 81        cx.on_app_quit(Self::app_will_quit).detach();
 82
 83        let maintain_connection = cx.spawn({
 84            let client = client.clone();
 85            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 86        });
 87
 88        cx.observe_global::<SettingsStore>(|_, cx| {
 89            ExtensionStore::global(cx).update(cx, |store, cx| store.auto_install_extensions(cx))
 90        })
 91        .detach();
 92
 93        DevServer {
 94            _subscriptions: vec![
 95                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
 96                client.add_request_handler(
 97                    cx.weak_model(),
 98                    Self::handle_validate_dev_server_project_request,
 99                ),
100                client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory),
101                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
102            ],
103            _maintain_connection: maintain_connection,
104            projects: Default::default(),
105            remote_shutdown: false,
106            app_state,
107            client,
108        }
109    }
110
111    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
112        let request = if self.remote_shutdown {
113            None
114        } else {
115            Some(
116                self.client
117                    .request(proto::ShutdownDevServer { reason: None }),
118            )
119        };
120        async move {
121            if let Some(request) = request {
122                request.await.log_err();
123            }
124        }
125    }
126
127    async fn handle_dev_server_instructions(
128        this: Model<Self>,
129        envelope: TypedEnvelope<proto::DevServerInstructions>,
130        mut cx: AsyncAppContext,
131    ) -> Result<()> {
132        let (added_projects, retained_projects, removed_projects_ids) =
133            this.read_with(&mut cx, |this, _| {
134                let removed_projects = this
135                    .projects
136                    .keys()
137                    .filter(|dev_server_project_id| {
138                        !envelope
139                            .payload
140                            .projects
141                            .iter()
142                            .any(|p| p.id == dev_server_project_id.0)
143                    })
144                    .cloned()
145                    .collect::<Vec<_>>();
146
147                let mut added_projects = vec![];
148                let mut retained_projects = vec![];
149
150                for project in envelope.payload.projects.iter() {
151                    if this.projects.contains_key(&DevServerProjectId(project.id)) {
152                        retained_projects.push(project.clone());
153                    } else {
154                        added_projects.push(project.clone());
155                    }
156                }
157
158                (added_projects, retained_projects, removed_projects)
159            })?;
160
161        for dev_server_project in added_projects {
162            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
163        }
164
165        for dev_server_project in retained_projects {
166            DevServer::update_project(this.clone(), &dev_server_project, &mut cx).await?;
167        }
168
169        this.update(&mut cx, |this, cx| {
170            for old_project_id in &removed_projects_ids {
171                this.unshare_project(old_project_id, cx)?;
172            }
173            Ok::<(), anyhow::Error>(())
174        })??;
175        Ok(())
176    }
177
178    async fn handle_validate_dev_server_project_request(
179        this: Model<Self>,
180        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
181        cx: AsyncAppContext,
182    ) -> Result<proto::Ack> {
183        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
184        let path = std::path::Path::new(&expanded);
185        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
186
187        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
188        if !path_exists {
189            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
190        }
191
192        Ok(proto::Ack {})
193    }
194
195    async fn handle_list_remote_directory(
196        this: Model<Self>,
197        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
198        cx: AsyncAppContext,
199    ) -> Result<proto::ListRemoteDirectoryResponse> {
200        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
201        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
202
203        let mut entries = Vec::new();
204        let mut response = fs.read_dir(Path::new(&expanded)).await?;
205        while let Some(path) = response.next().await {
206            if let Some(file_name) = path?.file_name() {
207                entries.push(file_name.to_string_lossy().to_string());
208            }
209        }
210        Ok(proto::ListRemoteDirectoryResponse { entries })
211    }
212
213    async fn handle_shutdown(
214        this: Model<Self>,
215        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
216        mut cx: AsyncAppContext,
217    ) -> Result<()> {
218        this.update(&mut cx, |this, cx| {
219            this.remote_shutdown = true;
220            cx.quit();
221        })
222    }
223
224    fn unshare_project(
225        &mut self,
226        dev_server_project_id: &DevServerProjectId,
227        cx: &mut ModelContext<Self>,
228    ) -> Result<()> {
229        if let Some(project) = self.projects.remove(dev_server_project_id) {
230            project.update(cx, |project, cx| project.unshare(cx))?;
231        }
232        Ok(())
233    }
234
235    async fn share_project(
236        this: Model<Self>,
237        dev_server_project: &proto::DevServerProject,
238        cx: &mut AsyncAppContext,
239    ) -> Result<()> {
240        let (client, project) = this.update(cx, |this, cx| {
241            let project = Project::local(
242                this.client.clone(),
243                this.app_state.node_runtime.clone(),
244                this.app_state.user_store.clone(),
245                this.app_state.languages.clone(),
246                this.app_state.fs.clone(),
247                None,
248                cx,
249            );
250
251            (this.client.clone(), project)
252        })?;
253
254        for path in &dev_server_project.paths {
255            let path = shellexpand::tilde(path).to_string();
256
257            let (worktree, _) = project
258                .update(cx, |project, cx| {
259                    project.find_or_create_worktree(&path, true, cx)
260                })?
261                .await?;
262
263            worktree.update(cx, |worktree, cx| {
264                worktree.as_local_mut().unwrap().share_private_files(cx)
265            })?;
266        }
267
268        let worktrees =
269            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
270
271        let response = client
272            .request(proto::ShareDevServerProject {
273                dev_server_project_id: dev_server_project.id,
274                worktrees,
275            })
276            .await?;
277
278        let project_id = response.project_id;
279        project.update(cx, |project, cx| project.shared(project_id, cx))??;
280        this.update(cx, |this, _| {
281            this.projects
282                .insert(DevServerProjectId(dev_server_project.id), project);
283        })?;
284        Ok(())
285    }
286
287    async fn update_project(
288        this: Model<Self>,
289        dev_server_project: &proto::DevServerProject,
290        cx: &mut AsyncAppContext,
291    ) -> Result<()> {
292        let tasks = this.update(cx, |this, cx| {
293            let Some(project) = this
294                .projects
295                .get(&DevServerProjectId(dev_server_project.id))
296            else {
297                return vec![];
298            };
299
300            let mut to_delete = vec![];
301            let mut tasks = vec![];
302
303            project.update(cx, |project, cx| {
304                for worktree in project.visible_worktrees(cx) {
305                    let mut delete = true;
306                    for config in dev_server_project.paths.iter() {
307                        if worktree.read(cx).abs_path().to_string_lossy()
308                            == shellexpand::tilde(config)
309                        {
310                            delete = false;
311                        }
312                    }
313                    if delete {
314                        to_delete.push(worktree.read(cx).id())
315                    }
316                }
317
318                for worktree_id in to_delete {
319                    project.remove_worktree(worktree_id, cx)
320                }
321
322                for config in dev_server_project.paths.iter() {
323                    tasks.push(project.find_or_create_worktree(
324                        shellexpand::tilde(config).to_string(),
325                        true,
326                        cx,
327                    ));
328                }
329
330                tasks
331            })
332        })?;
333        futures::future::join_all(tasks).await;
334        Ok(())
335    }
336
337    async fn maintain_connection(
338        this: WeakModel<Self>,
339        client: Arc<Client>,
340        mut cx: AsyncAppContext,
341    ) -> Result<()> {
342        let mut client_status = client.status();
343
344        let _ = client_status.try_recv();
345        let current_status = *client_status.borrow();
346        if current_status.is_connected() {
347            // wait for first disconnect
348            client_status.recv().await;
349        }
350
351        loop {
352            let Some(current_status) = client_status.recv().await else {
353                return Ok(());
354            };
355            let Some(this) = this.upgrade() else {
356                return Ok(());
357            };
358
359            if !current_status.is_connected() {
360                continue;
361            }
362
363            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
364        }
365    }
366
367    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
368        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
369        let request = self.client.request(proto::ReconnectDevServer {
370            reshared_projects: self
371                .projects
372                .iter()
373                .flat_map(|(_, handle)| {
374                    let project = handle.read(cx);
375                    let project_id = project.remote_id()?;
376                    projects.insert(project_id, handle.clone());
377                    Some(proto::UpdateProject {
378                        project_id,
379                        worktrees: project.worktree_metadata_protos(cx),
380                    })
381                })
382                .collect(),
383        });
384        cx.spawn(|_, mut cx| async move {
385            let response = request.await?;
386
387            for reshared_project in response.reshared_projects {
388                if let Some(project) = projects.get(&reshared_project.id) {
389                    project.update(&mut cx, |project, cx| {
390                        project.reshared(reshared_project, cx).log_err();
391                    })?;
392                }
393            }
394            Ok(())
395        })
396    }
397}