headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use fs::Fs;
  5use futures::Future;
  6use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  7use language::LanguageRegistry;
  8use node_runtime::NodeRuntime;
  9use postage::stream::Stream;
 10use project::Project;
 11use rpc::{proto, ErrorCode, TypedEnvelope};
 12use settings::Settings;
 13use std::{collections::HashMap, sync::Arc};
 14use util::{ResultExt, TryFutureExt};
 15
 16pub struct DevServer {
 17    client: Arc<Client>,
 18    app_state: AppState,
 19    remote_shutdown: bool,
 20    projects: HashMap<DevServerProjectId, Model<Project>>,
 21    _subscriptions: Vec<client::Subscription>,
 22    _maintain_connection: Task<Option<()>>,
 23}
 24
 25pub struct AppState {
 26    pub node_runtime: Arc<dyn NodeRuntime>,
 27    pub user_store: Model<UserStore>,
 28    pub languages: Arc<LanguageRegistry>,
 29    pub fs: Arc<dyn Fs>,
 30}
 31
 32struct GlobalDevServer(Model<DevServer>);
 33
 34impl Global for GlobalDevServer {}
 35
 36pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 37    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 38    cx.set_global(GlobalDevServer(dev_server.clone()));
 39
 40    #[cfg(not(target_os = "windows"))]
 41    {
 42        use signal_hook::consts::{SIGINT, SIGTERM};
 43        use signal_hook::iterator::Signals;
 44        // Set up a handler when the dev server is shut down
 45        // with ctrl-c or kill
 46        let (tx, rx) = futures::channel::oneshot::channel();
 47        let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
 48        std::thread::spawn({
 49            move || {
 50                if let Some(sig) = signals.forever().next() {
 51                    tx.send(sig).log_err();
 52                }
 53            }
 54        });
 55        cx.spawn(|cx| async move {
 56            if let Ok(sig) = rx.await {
 57                log::info!("received signal {sig:?}");
 58                cx.update(|cx| cx.quit()).log_err();
 59            }
 60        })
 61        .detach();
 62    }
 63
 64    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 65    cx.spawn(|cx| async move {
 66        client
 67            .authenticate_and_connect(false, &cx)
 68            .await
 69            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 70    })
 71}
 72
 73impl DevServer {
 74    pub fn global(cx: &AppContext) -> Model<DevServer> {
 75        cx.global::<GlobalDevServer>().0.clone()
 76    }
 77
 78    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 79        cx.on_app_quit(Self::app_will_quit).detach();
 80
 81        let maintain_connection = cx.spawn({
 82            let client = client.clone();
 83            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 84        });
 85
 86        DevServer {
 87            _subscriptions: vec![
 88                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
 89                client.add_request_handler(
 90                    cx.weak_model(),
 91                    Self::handle_validate_dev_server_project_request,
 92                ),
 93                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
 94            ],
 95            _maintain_connection: maintain_connection,
 96            projects: Default::default(),
 97            remote_shutdown: false,
 98            app_state,
 99            client,
100        }
101    }
102
103    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
104        let request = if self.remote_shutdown {
105            None
106        } else {
107            Some(
108                self.client
109                    .request(proto::ShutdownDevServer { reason: None }),
110            )
111        };
112        async move {
113            if let Some(request) = request {
114                request.await.log_err();
115            }
116        }
117    }
118
119    async fn handle_dev_server_instructions(
120        this: Model<Self>,
121        envelope: TypedEnvelope<proto::DevServerInstructions>,
122        mut cx: AsyncAppContext,
123    ) -> Result<()> {
124        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
125            let removed_projects = this
126                .projects
127                .keys()
128                .filter(|dev_server_project_id| {
129                    !envelope
130                        .payload
131                        .projects
132                        .iter()
133                        .any(|p| p.id == dev_server_project_id.0)
134                })
135                .cloned()
136                .collect::<Vec<_>>();
137
138            let added_projects = envelope
139                .payload
140                .projects
141                .into_iter()
142                .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
143                .collect::<Vec<_>>();
144
145            (added_projects, removed_projects)
146        })?;
147
148        for dev_server_project in added_projects {
149            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
150        }
151
152        this.update(&mut cx, |this, cx| {
153            for old_project_id in &removed_projects_ids {
154                this.unshare_project(old_project_id, cx)?;
155            }
156            Ok::<(), anyhow::Error>(())
157        })??;
158        Ok(())
159    }
160
161    async fn handle_validate_dev_server_project_request(
162        this: Model<Self>,
163        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
164        cx: AsyncAppContext,
165    ) -> Result<proto::Ack> {
166        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
167        let path = std::path::Path::new(&expanded);
168        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
169
170        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
171        if !path_exists {
172            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
173        }
174
175        Ok(proto::Ack {})
176    }
177
178    async fn handle_shutdown(
179        this: Model<Self>,
180        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
181        mut cx: AsyncAppContext,
182    ) -> Result<()> {
183        this.update(&mut cx, |this, cx| {
184            this.remote_shutdown = true;
185            cx.quit();
186        })
187    }
188
189    fn unshare_project(
190        &mut self,
191        dev_server_project_id: &DevServerProjectId,
192        cx: &mut ModelContext<Self>,
193    ) -> Result<()> {
194        if let Some(project) = self.projects.remove(dev_server_project_id) {
195            project.update(cx, |project, cx| project.unshare(cx))?;
196        }
197        Ok(())
198    }
199
200    async fn share_project(
201        this: Model<Self>,
202        dev_server_project: &proto::DevServerProject,
203        cx: &mut AsyncAppContext,
204    ) -> Result<()> {
205        let (client, project) = this.update(cx, |this, cx| {
206            let project = Project::local(
207                this.client.clone(),
208                this.app_state.node_runtime.clone(),
209                this.app_state.user_store.clone(),
210                this.app_state.languages.clone(),
211                this.app_state.fs.clone(),
212                cx,
213            );
214
215            (this.client.clone(), project)
216        })?;
217
218        let path = shellexpand::tilde(&dev_server_project.path).to_string();
219
220        let (worktree, _) = project
221            .update(cx, |project, cx| {
222                project.find_or_create_local_worktree(&path, true, cx)
223            })?
224            .await?;
225
226        worktree.update(cx, |worktree, cx| {
227            worktree.as_local_mut().unwrap().share_private_files(cx)
228        })?;
229
230        let worktrees =
231            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
232
233        let response = client
234            .request(proto::ShareDevServerProject {
235                dev_server_project_id: dev_server_project.id,
236                worktrees,
237            })
238            .await?;
239
240        let project_id = response.project_id;
241        project.update(cx, |project, cx| project.shared(project_id, cx))??;
242        this.update(cx, |this, _| {
243            this.projects
244                .insert(DevServerProjectId(dev_server_project.id), project);
245        })?;
246        Ok(())
247    }
248
249    async fn maintain_connection(
250        this: WeakModel<Self>,
251        client: Arc<Client>,
252        mut cx: AsyncAppContext,
253    ) -> Result<()> {
254        let mut client_status = client.status();
255
256        let _ = client_status.try_recv();
257        let current_status = *client_status.borrow();
258        if current_status.is_connected() {
259            // wait for first disconnect
260            client_status.recv().await;
261        }
262
263        loop {
264            let Some(current_status) = client_status.recv().await else {
265                return Ok(());
266            };
267            let Some(this) = this.upgrade() else {
268                return Ok(());
269            };
270
271            if !current_status.is_connected() {
272                continue;
273            }
274
275            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
276        }
277    }
278
279    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
280        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
281        let request = self.client.request(proto::ReconnectDevServer {
282            reshared_projects: self
283                .projects
284                .iter()
285                .flat_map(|(_, handle)| {
286                    let project = handle.read(cx);
287                    let project_id = project.remote_id()?;
288                    projects.insert(project_id, handle.clone());
289                    Some(proto::UpdateProject {
290                        project_id,
291                        worktrees: project.worktree_metadata_protos(cx),
292                    })
293                })
294                .collect(),
295        });
296        cx.spawn(|_, mut cx| async move {
297            let response = request.await?;
298
299            for reshared_project in response.reshared_projects {
300                if let Some(project) = projects.get(&reshared_project.id) {
301                    project.update(&mut cx, |project, cx| {
302                        project.reshared(reshared_project, cx).log_err();
303                    })?;
304                }
305            }
306            Ok(())
307        })
308    }
309}