headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use fs::Fs;
  5use futures::Future;
  6use gpui::{
  7    AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, UpdateGlobal,
  8    WeakModel,
  9};
 10use language::LanguageRegistry;
 11use node_runtime::NodeRuntime;
 12use postage::stream::Stream;
 13use project::{Project, WorktreeSettings};
 14use rpc::{proto, ErrorCode, TypedEnvelope};
 15use settings::{Settings, SettingsStore};
 16use std::{collections::HashMap, sync::Arc};
 17use util::{ResultExt, TryFutureExt};
 18
 19pub struct DevServer {
 20    client: Arc<Client>,
 21    app_state: AppState,
 22    remote_shutdown: bool,
 23    projects: HashMap<DevServerProjectId, Model<Project>>,
 24    _subscriptions: Vec<client::Subscription>,
 25    _maintain_connection: Task<Option<()>>,
 26}
 27
 28pub struct AppState {
 29    pub node_runtime: Arc<dyn NodeRuntime>,
 30    pub user_store: Model<UserStore>,
 31    pub languages: Arc<LanguageRegistry>,
 32    pub fs: Arc<dyn Fs>,
 33}
 34
 35struct GlobalDevServer(Model<DevServer>);
 36
 37impl Global for GlobalDevServer {}
 38
 39pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 40    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 41    cx.set_global(GlobalDevServer(dev_server.clone()));
 42
 43    // Dev server cannot have any private files for now
 44    SettingsStore::update_global(cx, |store, _cx| {
 45        let old_settings = store.get::<WorktreeSettings>(None);
 46        store.override_global(WorktreeSettings {
 47            private_files: Some(vec![]),
 48            ..old_settings.clone()
 49        });
 50    });
 51
 52    #[cfg(not(target_os = "windows"))]
 53    {
 54        use signal_hook::consts::{SIGINT, SIGTERM};
 55        use signal_hook::iterator::Signals;
 56        // Set up a handler when the dev server is shut down
 57        // with ctrl-c or kill
 58        let (tx, rx) = futures::channel::oneshot::channel();
 59        let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
 60        std::thread::spawn({
 61            move || {
 62                if let Some(sig) = signals.forever().next() {
 63                    tx.send(sig).log_err();
 64                }
 65            }
 66        });
 67        cx.spawn(|cx| async move {
 68            if let Ok(sig) = rx.await {
 69                log::info!("received signal {sig:?}");
 70                cx.update(|cx| cx.quit()).log_err();
 71            }
 72        })
 73        .detach();
 74    }
 75
 76    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 77    cx.spawn(|cx| async move {
 78        client
 79            .authenticate_and_connect(false, &cx)
 80            .await
 81            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 82    })
 83}
 84
 85impl DevServer {
 86    pub fn global(cx: &AppContext) -> Model<DevServer> {
 87        cx.global::<GlobalDevServer>().0.clone()
 88    }
 89
 90    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 91        cx.on_app_quit(Self::app_will_quit).detach();
 92
 93        let maintain_connection = cx.spawn({
 94            let client = client.clone();
 95            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 96        });
 97
 98        DevServer {
 99            _subscriptions: vec![
100                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
101                client.add_request_handler(
102                    cx.weak_model(),
103                    Self::handle_validate_dev_server_project_request,
104                ),
105                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
106            ],
107            _maintain_connection: maintain_connection,
108            projects: Default::default(),
109            remote_shutdown: false,
110            app_state,
111            client,
112        }
113    }
114
115    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
116        let request = if self.remote_shutdown {
117            None
118        } else {
119            Some(self.client.request(proto::ShutdownDevServer {}))
120        };
121        async move {
122            if let Some(request) = request {
123                request.await.log_err();
124            }
125        }
126    }
127
128    async fn handle_dev_server_instructions(
129        this: Model<Self>,
130        envelope: TypedEnvelope<proto::DevServerInstructions>,
131        _: Arc<Client>,
132        mut cx: AsyncAppContext,
133    ) -> Result<()> {
134        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
135            let removed_projects = this
136                .projects
137                .keys()
138                .filter(|dev_server_project_id| {
139                    !envelope
140                        .payload
141                        .projects
142                        .iter()
143                        .any(|p| p.id == dev_server_project_id.0)
144                })
145                .cloned()
146                .collect::<Vec<_>>();
147
148            let added_projects = envelope
149                .payload
150                .projects
151                .into_iter()
152                .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
153                .collect::<Vec<_>>();
154
155            (added_projects, removed_projects)
156        })?;
157
158        for dev_server_project in added_projects {
159            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
160        }
161
162        this.update(&mut cx, |this, cx| {
163            for old_project_id in &removed_projects_ids {
164                this.unshare_project(old_project_id, cx)?;
165            }
166            Ok::<(), anyhow::Error>(())
167        })??;
168        Ok(())
169    }
170
171    async fn handle_validate_dev_server_project_request(
172        this: Model<Self>,
173        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
174        _: Arc<Client>,
175        cx: AsyncAppContext,
176    ) -> Result<proto::Ack> {
177        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
178        let path = std::path::Path::new(&expanded);
179        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
180
181        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
182        if !path_exists {
183            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
184        }
185
186        Ok(proto::Ack {})
187    }
188
189    async fn handle_shutdown(
190        this: Model<Self>,
191        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
192        _: Arc<Client>,
193        mut cx: AsyncAppContext,
194    ) -> Result<()> {
195        this.update(&mut cx, |this, cx| {
196            this.remote_shutdown = true;
197            cx.quit();
198        })
199    }
200
201    fn unshare_project(
202        &mut self,
203        dev_server_project_id: &DevServerProjectId,
204        cx: &mut ModelContext<Self>,
205    ) -> Result<()> {
206        if let Some(project) = self.projects.remove(dev_server_project_id) {
207            project.update(cx, |project, cx| project.unshare(cx))?;
208        }
209        Ok(())
210    }
211
212    async fn share_project(
213        this: Model<Self>,
214        dev_server_project: &proto::DevServerProject,
215        cx: &mut AsyncAppContext,
216    ) -> Result<()> {
217        let (client, project) = this.update(cx, |this, cx| {
218            let project = Project::local(
219                this.client.clone(),
220                this.app_state.node_runtime.clone(),
221                this.app_state.user_store.clone(),
222                this.app_state.languages.clone(),
223                this.app_state.fs.clone(),
224                cx,
225            );
226
227            (this.client.clone(), project)
228        })?;
229
230        let path = shellexpand::tilde(&dev_server_project.path).to_string();
231
232        project
233            .update(cx, |project, cx| {
234                project.find_or_create_local_worktree(&path, true, cx)
235            })?
236            .await?;
237
238        let worktrees =
239            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
240
241        let response = client
242            .request(proto::ShareDevServerProject {
243                dev_server_project_id: dev_server_project.id,
244                worktrees,
245            })
246            .await?;
247
248        let project_id = response.project_id;
249        project.update(cx, |project, cx| project.shared(project_id, cx))??;
250        this.update(cx, |this, _| {
251            this.projects
252                .insert(DevServerProjectId(dev_server_project.id), project);
253        })?;
254        Ok(())
255    }
256
257    async fn maintain_connection(
258        this: WeakModel<Self>,
259        client: Arc<Client>,
260        mut cx: AsyncAppContext,
261    ) -> Result<()> {
262        let mut client_status = client.status();
263
264        let _ = client_status.try_recv();
265        let current_status = *client_status.borrow();
266        if current_status.is_connected() {
267            // wait for first disconnect
268            client_status.recv().await;
269        }
270
271        loop {
272            let Some(current_status) = client_status.recv().await else {
273                return Ok(());
274            };
275            let Some(this) = this.upgrade() else {
276                return Ok(());
277            };
278
279            if !current_status.is_connected() {
280                continue;
281            }
282
283            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
284        }
285    }
286
287    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
288        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
289        let request = self.client.request(proto::ReconnectDevServer {
290            reshared_projects: self
291                .projects
292                .iter()
293                .flat_map(|(_, handle)| {
294                    let project = handle.read(cx);
295                    let project_id = project.remote_id()?;
296                    projects.insert(project_id, handle.clone());
297                    Some(proto::UpdateProject {
298                        project_id,
299                        worktrees: project.worktree_metadata_protos(cx),
300                    })
301                })
302                .collect(),
303        });
304        cx.spawn(|_, mut cx| async move {
305            let response = request.await?;
306
307            for reshared_project in response.reshared_projects {
308                if let Some(project) = projects.get(&reshared_project.id) {
309                    project.update(&mut cx, |project, cx| {
310                        project.reshared(reshared_project, cx).log_err();
311                    })?;
312                }
313            }
314            Ok(())
315        })
316    }
317}