headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use extension::ExtensionStore;
  5use fs::Fs;
  6use futures::{Future, StreamExt};
  7use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  8use language::LanguageRegistry;
  9use node_runtime::NodeRuntime;
 10use postage::stream::Stream;
 11use project::Project;
 12use rpc::{proto, ErrorCode, TypedEnvelope};
 13use settings::{Settings, SettingsStore};
 14use std::path::Path;
 15use std::{collections::HashMap, sync::Arc};
 16use util::{ResultExt, TryFutureExt};
 17
 18pub struct DevServer {
 19    client: Arc<Client>,
 20    app_state: AppState,
 21    remote_shutdown: bool,
 22    projects: HashMap<DevServerProjectId, Model<Project>>,
 23    _subscriptions: Vec<client::Subscription>,
 24    _maintain_connection: Task<Option<()>>,
 25}
 26
 27pub struct AppState {
 28    pub node_runtime: Arc<dyn NodeRuntime>,
 29    pub user_store: Model<UserStore>,
 30    pub languages: Arc<LanguageRegistry>,
 31    pub fs: Arc<dyn Fs>,
 32}
 33
 34struct GlobalDevServer(Model<DevServer>);
 35
 36impl Global for GlobalDevServer {}
 37
 38pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 39    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 40    cx.set_global(GlobalDevServer(dev_server.clone()));
 41
 42    #[cfg(not(target_os = "windows"))]
 43    {
 44        use signal_hook::consts::{SIGINT, SIGTERM};
 45        use signal_hook::iterator::Signals;
 46        // Set up a handler when the dev server is shut down
 47        // with ctrl-c or kill
 48        let (tx, rx) = futures::channel::oneshot::channel();
 49        let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
 50        std::thread::spawn({
 51            move || {
 52                if let Some(sig) = signals.forever().next() {
 53                    tx.send(sig).log_err();
 54                }
 55            }
 56        });
 57        cx.spawn(|cx| async move {
 58            if let Ok(sig) = rx.await {
 59                log::info!("received signal {sig:?}");
 60                cx.update(|cx| cx.quit()).log_err();
 61            }
 62        })
 63        .detach();
 64    }
 65
 66    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 67    cx.spawn(|cx| async move {
 68        client
 69            .authenticate_and_connect(false, &cx)
 70            .await
 71            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 72    })
 73}
 74
 75impl DevServer {
 76    pub fn global(cx: &AppContext) -> Model<DevServer> {
 77        cx.global::<GlobalDevServer>().0.clone()
 78    }
 79
 80    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 81        cx.on_app_quit(Self::app_will_quit).detach();
 82
 83        let maintain_connection = cx.spawn({
 84            let client = client.clone();
 85            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 86        });
 87
 88        cx.observe_global::<SettingsStore>(|_, cx| {
 89            ExtensionStore::global(cx).update(cx, |store, cx| store.auto_install_extensions(cx))
 90        })
 91        .detach();
 92
 93        DevServer {
 94            _subscriptions: vec![
 95                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
 96                client.add_request_handler(
 97                    cx.weak_model(),
 98                    Self::handle_validate_dev_server_project_request,
 99                ),
100                client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory),
101                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
102            ],
103            _maintain_connection: maintain_connection,
104            projects: Default::default(),
105            remote_shutdown: false,
106            app_state,
107            client,
108        }
109    }
110
111    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
112        let request = if self.remote_shutdown {
113            None
114        } else {
115            Some(
116                self.client
117                    .request(proto::ShutdownDevServer { reason: None }),
118            )
119        };
120        async move {
121            if let Some(request) = request {
122                request.await.log_err();
123            }
124        }
125    }
126
127    async fn handle_dev_server_instructions(
128        this: Model<Self>,
129        envelope: TypedEnvelope<proto::DevServerInstructions>,
130        mut cx: AsyncAppContext,
131    ) -> Result<()> {
132        let (added_projects, retained_projects, removed_projects_ids) =
133            this.read_with(&mut cx, |this, _| {
134                let removed_projects = this
135                    .projects
136                    .keys()
137                    .filter(|dev_server_project_id| {
138                        !envelope
139                            .payload
140                            .projects
141                            .iter()
142                            .any(|p| p.id == dev_server_project_id.0)
143                    })
144                    .cloned()
145                    .collect::<Vec<_>>();
146
147                let mut added_projects = vec![];
148                let mut retained_projects = vec![];
149
150                for project in envelope.payload.projects.iter() {
151                    if this.projects.contains_key(&DevServerProjectId(project.id)) {
152                        retained_projects.push(project.clone());
153                    } else {
154                        added_projects.push(project.clone());
155                    }
156                }
157
158                (added_projects, retained_projects, removed_projects)
159            })?;
160
161        for dev_server_project in added_projects {
162            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
163        }
164
165        for dev_server_project in retained_projects {
166            DevServer::update_project(this.clone(), &dev_server_project, &mut cx).await?;
167        }
168
169        this.update(&mut cx, |this, cx| {
170            for old_project_id in &removed_projects_ids {
171                this.unshare_project(old_project_id, cx)?;
172            }
173            Ok::<(), anyhow::Error>(())
174        })??;
175        Ok(())
176    }
177
178    async fn handle_validate_dev_server_project_request(
179        this: Model<Self>,
180        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
181        cx: AsyncAppContext,
182    ) -> Result<proto::Ack> {
183        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
184        let path = std::path::Path::new(&expanded);
185        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
186
187        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
188        if !path_exists {
189            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
190        }
191
192        Ok(proto::Ack {})
193    }
194
195    async fn handle_list_remote_directory(
196        this: Model<Self>,
197        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
198        cx: AsyncAppContext,
199    ) -> Result<proto::ListRemoteDirectoryResponse> {
200        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
201        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
202
203        let mut entries = Vec::new();
204        let mut response = fs.read_dir(Path::new(&expanded)).await?;
205        while let Some(path) = response.next().await {
206            if let Some(file_name) = path?.file_name() {
207                entries.push(file_name.to_string_lossy().to_string());
208            }
209        }
210        Ok(proto::ListRemoteDirectoryResponse { entries })
211    }
212
213    async fn handle_shutdown(
214        this: Model<Self>,
215        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
216        mut cx: AsyncAppContext,
217    ) -> Result<()> {
218        this.update(&mut cx, |this, cx| {
219            this.remote_shutdown = true;
220            cx.quit();
221        })
222    }
223
224    fn unshare_project(
225        &mut self,
226        dev_server_project_id: &DevServerProjectId,
227        cx: &mut ModelContext<Self>,
228    ) -> Result<()> {
229        if let Some(project) = self.projects.remove(dev_server_project_id) {
230            project.update(cx, |project, cx| project.unshare(cx))?;
231        }
232        Ok(())
233    }
234
235    async fn share_project(
236        this: Model<Self>,
237        dev_server_project: &proto::DevServerProject,
238        cx: &mut AsyncAppContext,
239    ) -> Result<()> {
240        let (client, project) = this.update(cx, |this, cx| {
241            let project = Project::local(
242                this.client.clone(),
243                this.app_state.node_runtime.clone(),
244                this.app_state.user_store.clone(),
245                this.app_state.languages.clone(),
246                this.app_state.fs.clone(),
247                cx,
248            );
249
250            (this.client.clone(), project)
251        })?;
252
253        for path in &dev_server_project.paths {
254            let path = shellexpand::tilde(path).to_string();
255
256            let (worktree, _) = project
257                .update(cx, |project, cx| {
258                    project.find_or_create_worktree(&path, true, cx)
259                })?
260                .await?;
261
262            worktree.update(cx, |worktree, cx| {
263                worktree.as_local_mut().unwrap().share_private_files(cx)
264            })?;
265        }
266
267        let worktrees =
268            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
269
270        let response = client
271            .request(proto::ShareDevServerProject {
272                dev_server_project_id: dev_server_project.id,
273                worktrees,
274            })
275            .await?;
276
277        let project_id = response.project_id;
278        project.update(cx, |project, cx| project.shared(project_id, cx))??;
279        this.update(cx, |this, _| {
280            this.projects
281                .insert(DevServerProjectId(dev_server_project.id), project);
282        })?;
283        Ok(())
284    }
285
286    async fn update_project(
287        this: Model<Self>,
288        dev_server_project: &proto::DevServerProject,
289        cx: &mut AsyncAppContext,
290    ) -> Result<()> {
291        let tasks = this.update(cx, |this, cx| {
292            let Some(project) = this
293                .projects
294                .get(&DevServerProjectId(dev_server_project.id))
295            else {
296                return vec![];
297            };
298
299            let mut to_delete = vec![];
300            let mut tasks = vec![];
301
302            project.update(cx, |project, cx| {
303                for worktree in project.visible_worktrees(cx) {
304                    let mut delete = true;
305                    for config in dev_server_project.paths.iter() {
306                        if worktree.read(cx).abs_path().to_string_lossy()
307                            == shellexpand::tilde(config)
308                        {
309                            delete = false;
310                        }
311                    }
312                    if delete {
313                        to_delete.push(worktree.read(cx).id())
314                    }
315                }
316
317                for worktree_id in to_delete {
318                    project.remove_worktree(worktree_id, cx)
319                }
320
321                for config in dev_server_project.paths.iter() {
322                    tasks.push(project.find_or_create_worktree(
323                        &shellexpand::tilde(config).to_string(),
324                        true,
325                        cx,
326                    ));
327                }
328
329                tasks
330            })
331        })?;
332        futures::future::join_all(tasks).await;
333        Ok(())
334    }
335
336    async fn maintain_connection(
337        this: WeakModel<Self>,
338        client: Arc<Client>,
339        mut cx: AsyncAppContext,
340    ) -> Result<()> {
341        let mut client_status = client.status();
342
343        let _ = client_status.try_recv();
344        let current_status = *client_status.borrow();
345        if current_status.is_connected() {
346            // wait for first disconnect
347            client_status.recv().await;
348        }
349
350        loop {
351            let Some(current_status) = client_status.recv().await else {
352                return Ok(());
353            };
354            let Some(this) = this.upgrade() else {
355                return Ok(());
356            };
357
358            if !current_status.is_connected() {
359                continue;
360            }
361
362            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
363        }
364    }
365
366    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
367        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
368        let request = self.client.request(proto::ReconnectDevServer {
369            reshared_projects: self
370                .projects
371                .iter()
372                .flat_map(|(_, handle)| {
373                    let project = handle.read(cx);
374                    let project_id = project.remote_id()?;
375                    projects.insert(project_id, handle.clone());
376                    Some(proto::UpdateProject {
377                        project_id,
378                        worktrees: project.worktree_metadata_protos(cx),
379                    })
380                })
381                .collect(),
382        });
383        cx.spawn(|_, mut cx| async move {
384            let response = request.await?;
385
386            for reshared_project in response.reshared_projects {
387                if let Some(project) = projects.get(&reshared_project.id) {
388                    project.update(&mut cx, |project, cx| {
389                        project.reshared(reshared_project, cx).log_err();
390                    })?;
391                }
392            }
393            Ok(())
394        })
395    }
396}