headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use fs::Fs;
  5use futures::Future;
  6use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  7use language::LanguageRegistry;
  8use node_runtime::NodeRuntime;
  9use postage::stream::Stream;
 10use project::Project;
 11use rpc::{proto, ErrorCode, TypedEnvelope};
 12use settings::Settings;
 13use std::{collections::HashMap, sync::Arc};
 14use util::{ResultExt, TryFutureExt};
 15
 16pub struct DevServer {
 17    client: Arc<Client>,
 18    app_state: AppState,
 19    remote_shutdown: bool,
 20    projects: HashMap<DevServerProjectId, Model<Project>>,
 21    _subscriptions: Vec<client::Subscription>,
 22    _maintain_connection: Task<Option<()>>,
 23}
 24
 25pub struct AppState {
 26    pub node_runtime: Arc<dyn NodeRuntime>,
 27    pub user_store: Model<UserStore>,
 28    pub languages: Arc<LanguageRegistry>,
 29    pub fs: Arc<dyn Fs>,
 30}
 31
 32struct GlobalDevServer(Model<DevServer>);
 33
 34impl Global for GlobalDevServer {}
 35
 36pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 37    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 38    cx.set_global(GlobalDevServer(dev_server.clone()));
 39
 40    #[cfg(not(target_os = "windows"))]
 41    {
 42        use signal_hook::consts::{SIGINT, SIGTERM};
 43        use signal_hook::iterator::Signals;
 44        // Set up a handler when the dev server is shut down
 45        // with ctrl-c or kill
 46        let (tx, rx) = futures::channel::oneshot::channel();
 47        let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
 48        std::thread::spawn({
 49            move || {
 50                if let Some(sig) = signals.forever().next() {
 51                    tx.send(sig).log_err();
 52                }
 53            }
 54        });
 55        cx.spawn(|cx| async move {
 56            if let Ok(sig) = rx.await {
 57                log::info!("received signal {sig:?}");
 58                cx.update(|cx| cx.quit()).log_err();
 59            }
 60        })
 61        .detach();
 62    }
 63
 64    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 65    cx.spawn(|cx| async move {
 66        client
 67            .authenticate_and_connect(false, &cx)
 68            .await
 69            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 70    })
 71}
 72
 73impl DevServer {
 74    pub fn global(cx: &AppContext) -> Model<DevServer> {
 75        cx.global::<GlobalDevServer>().0.clone()
 76    }
 77
 78    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 79        cx.on_app_quit(Self::app_will_quit).detach();
 80
 81        let maintain_connection = cx.spawn({
 82            let client = client.clone();
 83            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 84        });
 85
 86        DevServer {
 87            _subscriptions: vec![
 88                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
 89                client.add_request_handler(
 90                    cx.weak_model(),
 91                    Self::handle_validate_dev_server_project_request,
 92                ),
 93                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
 94            ],
 95            _maintain_connection: maintain_connection,
 96            projects: Default::default(),
 97            remote_shutdown: false,
 98            app_state,
 99            client,
100        }
101    }
102
103    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
104        let request = if self.remote_shutdown {
105            None
106        } else {
107            Some(
108                self.client
109                    .request(proto::ShutdownDevServer { reason: None }),
110            )
111        };
112        async move {
113            if let Some(request) = request {
114                request.await.log_err();
115            }
116        }
117    }
118
119    async fn handle_dev_server_instructions(
120        this: Model<Self>,
121        envelope: TypedEnvelope<proto::DevServerInstructions>,
122        _: Arc<Client>,
123        mut cx: AsyncAppContext,
124    ) -> Result<()> {
125        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
126            let removed_projects = this
127                .projects
128                .keys()
129                .filter(|dev_server_project_id| {
130                    !envelope
131                        .payload
132                        .projects
133                        .iter()
134                        .any(|p| p.id == dev_server_project_id.0)
135                })
136                .cloned()
137                .collect::<Vec<_>>();
138
139            let added_projects = envelope
140                .payload
141                .projects
142                .into_iter()
143                .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
144                .collect::<Vec<_>>();
145
146            (added_projects, removed_projects)
147        })?;
148
149        for dev_server_project in added_projects {
150            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
151        }
152
153        this.update(&mut cx, |this, cx| {
154            for old_project_id in &removed_projects_ids {
155                this.unshare_project(old_project_id, cx)?;
156            }
157            Ok::<(), anyhow::Error>(())
158        })??;
159        Ok(())
160    }
161
162    async fn handle_validate_dev_server_project_request(
163        this: Model<Self>,
164        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
165        _: Arc<Client>,
166        cx: AsyncAppContext,
167    ) -> Result<proto::Ack> {
168        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
169        let path = std::path::Path::new(&expanded);
170        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
171
172        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
173        if !path_exists {
174            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
175        }
176
177        Ok(proto::Ack {})
178    }
179
180    async fn handle_shutdown(
181        this: Model<Self>,
182        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
183        _: Arc<Client>,
184        mut cx: AsyncAppContext,
185    ) -> Result<()> {
186        this.update(&mut cx, |this, cx| {
187            this.remote_shutdown = true;
188            cx.quit();
189        })
190    }
191
192    fn unshare_project(
193        &mut self,
194        dev_server_project_id: &DevServerProjectId,
195        cx: &mut ModelContext<Self>,
196    ) -> Result<()> {
197        if let Some(project) = self.projects.remove(dev_server_project_id) {
198            project.update(cx, |project, cx| project.unshare(cx))?;
199        }
200        Ok(())
201    }
202
203    async fn share_project(
204        this: Model<Self>,
205        dev_server_project: &proto::DevServerProject,
206        cx: &mut AsyncAppContext,
207    ) -> Result<()> {
208        let (client, project) = this.update(cx, |this, cx| {
209            let project = Project::local(
210                this.client.clone(),
211                this.app_state.node_runtime.clone(),
212                this.app_state.user_store.clone(),
213                this.app_state.languages.clone(),
214                this.app_state.fs.clone(),
215                cx,
216            );
217
218            (this.client.clone(), project)
219        })?;
220
221        let path = shellexpand::tilde(&dev_server_project.path).to_string();
222
223        let (worktree, _) = project
224            .update(cx, |project, cx| {
225                project.find_or_create_local_worktree(&path, true, cx)
226            })?
227            .await?;
228
229        worktree.update(cx, |worktree, cx| {
230            worktree.as_local_mut().unwrap().share_private_files(cx)
231        })?;
232
233        let worktrees =
234            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
235
236        let response = client
237            .request(proto::ShareDevServerProject {
238                dev_server_project_id: dev_server_project.id,
239                worktrees,
240            })
241            .await?;
242
243        let project_id = response.project_id;
244        project.update(cx, |project, cx| project.shared(project_id, cx))??;
245        this.update(cx, |this, _| {
246            this.projects
247                .insert(DevServerProjectId(dev_server_project.id), project);
248        })?;
249        Ok(())
250    }
251
252    async fn maintain_connection(
253        this: WeakModel<Self>,
254        client: Arc<Client>,
255        mut cx: AsyncAppContext,
256    ) -> Result<()> {
257        let mut client_status = client.status();
258
259        let _ = client_status.try_recv();
260        let current_status = *client_status.borrow();
261        if current_status.is_connected() {
262            // wait for first disconnect
263            client_status.recv().await;
264        }
265
266        loop {
267            let Some(current_status) = client_status.recv().await else {
268                return Ok(());
269            };
270            let Some(this) = this.upgrade() else {
271                return Ok(());
272            };
273
274            if !current_status.is_connected() {
275                continue;
276            }
277
278            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
279        }
280    }
281
282    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
283        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
284        let request = self.client.request(proto::ReconnectDevServer {
285            reshared_projects: self
286                .projects
287                .iter()
288                .flat_map(|(_, handle)| {
289                    let project = handle.read(cx);
290                    let project_id = project.remote_id()?;
291                    projects.insert(project_id, handle.clone());
292                    Some(proto::UpdateProject {
293                        project_id,
294                        worktrees: project.worktree_metadata_protos(cx),
295                    })
296                })
297                .collect(),
298        });
299        cx.spawn(|_, mut cx| async move {
300            let response = request.await?;
301
302            for reshared_project in response.reshared_projects {
303                if let Some(project) = projects.get(&reshared_project.id) {
304                    project.update(&mut cx, |project, cx| {
305                        project.reshared(reshared_project, cx).log_err();
306                    })?;
307                }
308            }
309            Ok(())
310        })
311    }
312}