headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use fs::Fs;
  5use futures::Future;
  6use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  7use language::LanguageRegistry;
  8use node_runtime::NodeRuntime;
  9use postage::stream::Stream;
 10use project::Project;
 11use rpc::{proto, ErrorCode, TypedEnvelope};
 12use settings::Settings;
 13use std::{collections::HashMap, sync::Arc};
 14use util::{ResultExt, TryFutureExt};
 15
 16pub struct DevServer {
 17    client: Arc<Client>,
 18    app_state: AppState,
 19    remote_shutdown: bool,
 20    projects: HashMap<DevServerProjectId, Model<Project>>,
 21    _subscriptions: Vec<client::Subscription>,
 22    _maintain_connection: Task<Option<()>>,
 23}
 24
 25pub struct AppState {
 26    pub node_runtime: Arc<dyn NodeRuntime>,
 27    pub user_store: Model<UserStore>,
 28    pub languages: Arc<LanguageRegistry>,
 29    pub fs: Arc<dyn Fs>,
 30}
 31
 32struct GlobalDevServer(Model<DevServer>);
 33
 34impl Global for GlobalDevServer {}
 35
 36pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 37    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 38    cx.set_global(GlobalDevServer(dev_server.clone()));
 39
 40    #[cfg(not(target_os = "windows"))]
 41    {
 42        use signal_hook::consts::{SIGINT, SIGTERM};
 43        use signal_hook::iterator::Signals;
 44        // Set up a handler when the dev server is shut down
 45        // with ctrl-c or kill
 46        let (tx, rx) = futures::channel::oneshot::channel();
 47        let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
 48        std::thread::spawn({
 49            move || {
 50                if let Some(sig) = signals.forever().next() {
 51                    tx.send(sig).log_err();
 52                }
 53            }
 54        });
 55        cx.spawn(|cx| async move {
 56            if let Ok(sig) = rx.await {
 57                log::info!("received signal {sig:?}");
 58                cx.update(|cx| cx.quit()).log_err();
 59            }
 60        })
 61        .detach();
 62    }
 63
 64    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 65    cx.spawn(|cx| async move {
 66        client
 67            .authenticate_and_connect(false, &cx)
 68            .await
 69            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 70    })
 71}
 72
 73impl DevServer {
 74    pub fn global(cx: &AppContext) -> Model<DevServer> {
 75        cx.global::<GlobalDevServer>().0.clone()
 76    }
 77
 78    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 79        cx.on_app_quit(Self::app_will_quit).detach();
 80
 81        let maintain_connection = cx.spawn({
 82            let client = client.clone();
 83            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 84        });
 85
 86        DevServer {
 87            _subscriptions: vec![
 88                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
 89                client.add_request_handler(
 90                    cx.weak_model(),
 91                    Self::handle_validate_dev_server_project_request,
 92                ),
 93                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
 94            ],
 95            _maintain_connection: maintain_connection,
 96            projects: Default::default(),
 97            remote_shutdown: false,
 98            app_state,
 99            client,
100        }
101    }
102
103    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
104        let request = if self.remote_shutdown {
105            None
106        } else {
107            Some(self.client.request(proto::ShutdownDevServer {}))
108        };
109        async move {
110            if let Some(request) = request {
111                request.await.log_err();
112            }
113        }
114    }
115
116    async fn handle_dev_server_instructions(
117        this: Model<Self>,
118        envelope: TypedEnvelope<proto::DevServerInstructions>,
119        _: Arc<Client>,
120        mut cx: AsyncAppContext,
121    ) -> Result<()> {
122        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
123            let removed_projects = this
124                .projects
125                .keys()
126                .filter(|dev_server_project_id| {
127                    !envelope
128                        .payload
129                        .projects
130                        .iter()
131                        .any(|p| p.id == dev_server_project_id.0)
132                })
133                .cloned()
134                .collect::<Vec<_>>();
135
136            let added_projects = envelope
137                .payload
138                .projects
139                .into_iter()
140                .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
141                .collect::<Vec<_>>();
142
143            (added_projects, removed_projects)
144        })?;
145
146        for dev_server_project in added_projects {
147            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
148        }
149
150        this.update(&mut cx, |this, cx| {
151            for old_project_id in &removed_projects_ids {
152                this.unshare_project(old_project_id, cx)?;
153            }
154            Ok::<(), anyhow::Error>(())
155        })??;
156        Ok(())
157    }
158
159    async fn handle_validate_dev_server_project_request(
160        this: Model<Self>,
161        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
162        _: Arc<Client>,
163        cx: AsyncAppContext,
164    ) -> Result<proto::Ack> {
165        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
166        let path = std::path::Path::new(&expanded);
167        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
168
169        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
170        if !path_exists {
171            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
172        }
173
174        Ok(proto::Ack {})
175    }
176
177    async fn handle_shutdown(
178        this: Model<Self>,
179        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
180        _: Arc<Client>,
181        mut cx: AsyncAppContext,
182    ) -> Result<()> {
183        this.update(&mut cx, |this, cx| {
184            this.remote_shutdown = true;
185            cx.quit();
186        })
187    }
188
189    fn unshare_project(
190        &mut self,
191        dev_server_project_id: &DevServerProjectId,
192        cx: &mut ModelContext<Self>,
193    ) -> Result<()> {
194        if let Some(project) = self.projects.remove(dev_server_project_id) {
195            project.update(cx, |project, cx| project.unshare(cx))?;
196        }
197        Ok(())
198    }
199
200    async fn share_project(
201        this: Model<Self>,
202        dev_server_project: &proto::DevServerProject,
203        cx: &mut AsyncAppContext,
204    ) -> Result<()> {
205        let (client, project) = this.update(cx, |this, cx| {
206            let project = Project::local(
207                this.client.clone(),
208                this.app_state.node_runtime.clone(),
209                this.app_state.user_store.clone(),
210                this.app_state.languages.clone(),
211                this.app_state.fs.clone(),
212                cx,
213            );
214
215            (this.client.clone(), project)
216        })?;
217
218        let path = shellexpand::tilde(&dev_server_project.path).to_string();
219
220        let (worktree, _) = project
221            .update(cx, |project, cx| {
222                project.find_or_create_local_worktree(&path, true, cx)
223            })?
224            .await?;
225
226        worktree.update(cx, |worktree, cx| {
227            worktree.as_local_mut().unwrap().share_private_files(cx)
228        })?;
229
230        let worktrees =
231            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
232
233        let response = client
234            .request(proto::ShareDevServerProject {
235                dev_server_project_id: dev_server_project.id,
236                worktrees,
237            })
238            .await?;
239
240        let project_id = response.project_id;
241        project.update(cx, |project, cx| project.shared(project_id, cx))??;
242        this.update(cx, |this, _| {
243            this.projects
244                .insert(DevServerProjectId(dev_server_project.id), project);
245        })?;
246        Ok(())
247    }
248
249    async fn maintain_connection(
250        this: WeakModel<Self>,
251        client: Arc<Client>,
252        mut cx: AsyncAppContext,
253    ) -> Result<()> {
254        let mut client_status = client.status();
255
256        let _ = client_status.try_recv();
257        let current_status = *client_status.borrow();
258        if current_status.is_connected() {
259            // wait for first disconnect
260            client_status.recv().await;
261        }
262
263        loop {
264            let Some(current_status) = client_status.recv().await else {
265                return Ok(());
266            };
267            let Some(this) = this.upgrade() else {
268                return Ok(());
269            };
270
271            if !current_status.is_connected() {
272                continue;
273            }
274
275            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
276        }
277    }
278
279    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
280        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
281        let request = self.client.request(proto::ReconnectDevServer {
282            reshared_projects: self
283                .projects
284                .iter()
285                .flat_map(|(_, handle)| {
286                    let project = handle.read(cx);
287                    let project_id = project.remote_id()?;
288                    projects.insert(project_id, handle.clone());
289                    Some(proto::UpdateProject {
290                        project_id,
291                        worktrees: project.worktree_metadata_protos(cx),
292                    })
293                })
294                .collect(),
295        });
296        cx.spawn(|_, mut cx| async move {
297            let response = request.await?;
298
299            for reshared_project in response.reshared_projects {
300                if let Some(project) = projects.get(&reshared_project.id) {
301                    project.update(&mut cx, |project, cx| {
302                        project.reshared(reshared_project, cx).log_err();
303                    })?;
304                }
305            }
306            Ok(())
307        })
308    }
309}