headless.rs

  1use anyhow::{anyhow, Result};
  2use client::DevServerProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use extension::ExtensionStore;
  5use fs::Fs;
  6use futures::Future;
  7use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  8use language::LanguageRegistry;
  9use node_runtime::NodeRuntime;
 10use postage::stream::Stream;
 11use project::Project;
 12use rpc::{proto, ErrorCode, TypedEnvelope};
 13use settings::{Settings, SettingsStore};
 14use std::{collections::HashMap, sync::Arc};
 15use util::{ResultExt, TryFutureExt};
 16
 17pub struct DevServer {
 18    client: Arc<Client>,
 19    app_state: AppState,
 20    remote_shutdown: bool,
 21    projects: HashMap<DevServerProjectId, Model<Project>>,
 22    _subscriptions: Vec<client::Subscription>,
 23    _maintain_connection: Task<Option<()>>,
 24}
 25
 26pub struct AppState {
 27    pub node_runtime: Arc<dyn NodeRuntime>,
 28    pub user_store: Model<UserStore>,
 29    pub languages: Arc<LanguageRegistry>,
 30    pub fs: Arc<dyn Fs>,
 31}
 32
 33struct GlobalDevServer(Model<DevServer>);
 34
 35impl Global for GlobalDevServer {}
 36
 37pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
 38    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 39    cx.set_global(GlobalDevServer(dev_server.clone()));
 40
 41    #[cfg(not(target_os = "windows"))]
 42    {
 43        use signal_hook::consts::{SIGINT, SIGTERM};
 44        use signal_hook::iterator::Signals;
 45        // Set up a handler when the dev server is shut down
 46        // with ctrl-c or kill
 47        let (tx, rx) = futures::channel::oneshot::channel();
 48        let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
 49        std::thread::spawn({
 50            move || {
 51                if let Some(sig) = signals.forever().next() {
 52                    tx.send(sig).log_err();
 53                }
 54            }
 55        });
 56        cx.spawn(|cx| async move {
 57            if let Ok(sig) = rx.await {
 58                log::info!("received signal {sig:?}");
 59                cx.update(|cx| cx.quit()).log_err();
 60            }
 61        })
 62        .detach();
 63    }
 64
 65    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 66    cx.spawn(|cx| async move {
 67        client
 68            .authenticate_and_connect(false, &cx)
 69            .await
 70            .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
 71    })
 72}
 73
 74impl DevServer {
 75    pub fn global(cx: &AppContext) -> Model<DevServer> {
 76        cx.global::<GlobalDevServer>().0.clone()
 77    }
 78
 79    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 80        cx.on_app_quit(Self::app_will_quit).detach();
 81
 82        let maintain_connection = cx.spawn({
 83            let client = client.clone();
 84            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 85        });
 86
 87        cx.observe_global::<SettingsStore>(|_, cx| {
 88            ExtensionStore::global(cx).update(cx, |store, cx| store.auto_install_extensions(cx))
 89        })
 90        .detach();
 91
 92        DevServer {
 93            _subscriptions: vec![
 94                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
 95                client.add_request_handler(
 96                    cx.weak_model(),
 97                    Self::handle_validate_dev_server_project_request,
 98                ),
 99                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
100            ],
101            _maintain_connection: maintain_connection,
102            projects: Default::default(),
103            remote_shutdown: false,
104            app_state,
105            client,
106        }
107    }
108
109    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
110        let request = if self.remote_shutdown {
111            None
112        } else {
113            Some(
114                self.client
115                    .request(proto::ShutdownDevServer { reason: None }),
116            )
117        };
118        async move {
119            if let Some(request) = request {
120                request.await.log_err();
121            }
122        }
123    }
124
125    async fn handle_dev_server_instructions(
126        this: Model<Self>,
127        envelope: TypedEnvelope<proto::DevServerInstructions>,
128        mut cx: AsyncAppContext,
129    ) -> Result<()> {
130        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
131            let removed_projects = this
132                .projects
133                .keys()
134                .filter(|dev_server_project_id| {
135                    !envelope
136                        .payload
137                        .projects
138                        .iter()
139                        .any(|p| p.id == dev_server_project_id.0)
140                })
141                .cloned()
142                .collect::<Vec<_>>();
143
144            let added_projects = envelope
145                .payload
146                .projects
147                .into_iter()
148                .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
149                .collect::<Vec<_>>();
150
151            (added_projects, removed_projects)
152        })?;
153
154        for dev_server_project in added_projects {
155            DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
156        }
157
158        this.update(&mut cx, |this, cx| {
159            for old_project_id in &removed_projects_ids {
160                this.unshare_project(old_project_id, cx)?;
161            }
162            Ok::<(), anyhow::Error>(())
163        })??;
164        Ok(())
165    }
166
167    async fn handle_validate_dev_server_project_request(
168        this: Model<Self>,
169        envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
170        cx: AsyncAppContext,
171    ) -> Result<proto::Ack> {
172        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
173        let path = std::path::Path::new(&expanded);
174        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
175
176        let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
177        if !path_exists {
178            return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
179        }
180
181        Ok(proto::Ack {})
182    }
183
184    async fn handle_shutdown(
185        this: Model<Self>,
186        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
187        mut cx: AsyncAppContext,
188    ) -> Result<()> {
189        this.update(&mut cx, |this, cx| {
190            this.remote_shutdown = true;
191            cx.quit();
192        })
193    }
194
195    fn unshare_project(
196        &mut self,
197        dev_server_project_id: &DevServerProjectId,
198        cx: &mut ModelContext<Self>,
199    ) -> Result<()> {
200        if let Some(project) = self.projects.remove(dev_server_project_id) {
201            project.update(cx, |project, cx| project.unshare(cx))?;
202        }
203        Ok(())
204    }
205
206    async fn share_project(
207        this: Model<Self>,
208        dev_server_project: &proto::DevServerProject,
209        cx: &mut AsyncAppContext,
210    ) -> Result<()> {
211        let (client, project) = this.update(cx, |this, cx| {
212            let project = Project::local(
213                this.client.clone(),
214                this.app_state.node_runtime.clone(),
215                this.app_state.user_store.clone(),
216                this.app_state.languages.clone(),
217                this.app_state.fs.clone(),
218                cx,
219            );
220
221            (this.client.clone(), project)
222        })?;
223
224        let path = shellexpand::tilde(&dev_server_project.path).to_string();
225
226        let (worktree, _) = project
227            .update(cx, |project, cx| {
228                project.find_or_create_local_worktree(&path, true, cx)
229            })?
230            .await?;
231
232        worktree.update(cx, |worktree, cx| {
233            worktree.as_local_mut().unwrap().share_private_files(cx)
234        })?;
235
236        let worktrees =
237            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
238
239        let response = client
240            .request(proto::ShareDevServerProject {
241                dev_server_project_id: dev_server_project.id,
242                worktrees,
243            })
244            .await?;
245
246        let project_id = response.project_id;
247        project.update(cx, |project, cx| project.shared(project_id, cx))??;
248        this.update(cx, |this, _| {
249            this.projects
250                .insert(DevServerProjectId(dev_server_project.id), project);
251        })?;
252        Ok(())
253    }
254
255    async fn maintain_connection(
256        this: WeakModel<Self>,
257        client: Arc<Client>,
258        mut cx: AsyncAppContext,
259    ) -> Result<()> {
260        let mut client_status = client.status();
261
262        let _ = client_status.try_recv();
263        let current_status = *client_status.borrow();
264        if current_status.is_connected() {
265            // wait for first disconnect
266            client_status.recv().await;
267        }
268
269        loop {
270            let Some(current_status) = client_status.recv().await else {
271                return Ok(());
272            };
273            let Some(this) = this.upgrade() else {
274                return Ok(());
275            };
276
277            if !current_status.is_connected() {
278                continue;
279            }
280
281            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
282        }
283    }
284
285    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
287        let request = self.client.request(proto::ReconnectDevServer {
288            reshared_projects: self
289                .projects
290                .iter()
291                .flat_map(|(_, handle)| {
292                    let project = handle.read(cx);
293                    let project_id = project.remote_id()?;
294                    projects.insert(project_id, handle.clone());
295                    Some(proto::UpdateProject {
296                        project_id,
297                        worktrees: project.worktree_metadata_protos(cx),
298                    })
299                })
300                .collect(),
301        });
302        cx.spawn(|_, mut cx| async move {
303            let response = request.await?;
304
305            for reshared_project in response.reshared_projects {
306                if let Some(project) = projects.get(&reshared_project.id) {
307                    project.update(&mut cx, |project, cx| {
308                        project.reshared(reshared_project, cx).log_err();
309                    })?;
310                }
311            }
312            Ok(())
313        })
314    }
315}