headless.rs

  1use anyhow::Result;
  2use client::RemoteProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use fs::Fs;
  5use futures::Future;
  6use gpui::{
  7    AppContext, AsyncAppContext, BorrowAppContext, Context, Global, Model, ModelContext, Task,
  8    WeakModel,
  9};
 10use language::LanguageRegistry;
 11use node_runtime::NodeRuntime;
 12use postage::stream::Stream;
 13use project::{Project, WorktreeSettings};
 14use rpc::{proto, ErrorCode, TypedEnvelope};
 15use settings::{Settings, SettingsStore};
 16use std::{collections::HashMap, sync::Arc};
 17use util::{ResultExt, TryFutureExt};
 18
 19pub struct DevServer {
 20    client: Arc<Client>,
 21    app_state: AppState,
 22    remote_shutdown: bool,
 23    projects: HashMap<RemoteProjectId, Model<Project>>,
 24    _subscriptions: Vec<client::Subscription>,
 25    _maintain_connection: Task<Option<()>>,
 26}
 27
 28pub struct AppState {
 29    pub node_runtime: Arc<dyn NodeRuntime>,
 30    pub user_store: Model<UserStore>,
 31    pub languages: Arc<LanguageRegistry>,
 32    pub fs: Arc<dyn Fs>,
 33}
 34
 35struct GlobalDevServer(Model<DevServer>);
 36
 37impl Global for GlobalDevServer {}
 38
 39pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) {
 40    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 41    cx.set_global(GlobalDevServer(dev_server.clone()));
 42
 43    // Dev server cannot have any private files for now
 44    cx.update_global(|store: &mut SettingsStore, _| {
 45        let old_settings = store.get::<WorktreeSettings>(None);
 46        store.override_global(WorktreeSettings {
 47            private_files: Some(vec![]),
 48            ..old_settings.clone()
 49        });
 50    });
 51
 52    // Set up a handler when the dev server is shut down by the user pressing Ctrl-C
 53    let (tx, rx) = futures::channel::oneshot::channel();
 54    set_ctrlc_handler(move || tx.send(()).log_err().unwrap()).log_err();
 55
 56    cx.spawn(|cx| async move {
 57        rx.await.log_err();
 58        log::info!("Received interrupt signal");
 59        cx.update(|cx| cx.quit()).log_err();
 60    })
 61    .detach();
 62
 63    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 64    cx.spawn(|cx| async move {
 65        match client.authenticate_and_connect(false, &cx).await {
 66            Ok(_) => {
 67                log::info!("Connected to {}", server_url);
 68            }
 69            Err(e) => {
 70                log::error!("Error connecting to '{}': {}", server_url, e);
 71                cx.update(|cx| cx.quit()).log_err();
 72            }
 73        }
 74    })
 75    .detach();
 76}
 77
 78fn set_ctrlc_handler<F>(f: F) -> Result<(), ctrlc::Error>
 79where
 80    F: FnOnce() + 'static + Send,
 81{
 82    let f = std::sync::Mutex::new(Some(f));
 83    ctrlc::set_handler(move || {
 84        if let Ok(mut guard) = f.lock() {
 85            let f = guard.take().expect("f can only be taken once");
 86            f();
 87        }
 88    })
 89}
 90
 91impl DevServer {
 92    pub fn global(cx: &AppContext) -> Model<DevServer> {
 93        cx.global::<GlobalDevServer>().0.clone()
 94    }
 95
 96    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 97        cx.on_app_quit(Self::app_will_quit).detach();
 98
 99        let maintain_connection = cx.spawn({
100            let client = client.clone();
101            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
102        });
103
104        DevServer {
105            _subscriptions: vec![
106                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
107                client.add_request_handler(
108                    cx.weak_model(),
109                    Self::handle_validate_remote_project_request,
110                ),
111                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
112            ],
113            _maintain_connection: maintain_connection,
114            projects: Default::default(),
115            remote_shutdown: false,
116            app_state,
117            client,
118        }
119    }
120
121    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
122        let request = if self.remote_shutdown {
123            None
124        } else {
125            Some(self.client.request(proto::ShutdownDevServer {}))
126        };
127        async move {
128            if let Some(request) = request {
129                request.await.log_err();
130            }
131        }
132    }
133
134    async fn handle_dev_server_instructions(
135        this: Model<Self>,
136        envelope: TypedEnvelope<proto::DevServerInstructions>,
137        _: Arc<Client>,
138        mut cx: AsyncAppContext,
139    ) -> Result<()> {
140        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
141            let removed_projects = this
142                .projects
143                .keys()
144                .filter(|remote_project_id| {
145                    !envelope
146                        .payload
147                        .projects
148                        .iter()
149                        .any(|p| p.id == remote_project_id.0)
150                })
151                .cloned()
152                .collect::<Vec<_>>();
153
154            let added_projects = envelope
155                .payload
156                .projects
157                .into_iter()
158                .filter(|project| !this.projects.contains_key(&RemoteProjectId(project.id)))
159                .collect::<Vec<_>>();
160
161            (added_projects, removed_projects)
162        })?;
163
164        for remote_project in added_projects {
165            DevServer::share_project(this.clone(), &remote_project, &mut cx).await?;
166        }
167
168        this.update(&mut cx, |this, cx| {
169            for old_project_id in &removed_projects_ids {
170                this.unshare_project(old_project_id, cx)?;
171            }
172            Ok::<(), anyhow::Error>(())
173        })??;
174        Ok(())
175    }
176
177    async fn handle_validate_remote_project_request(
178        this: Model<Self>,
179        envelope: TypedEnvelope<proto::ValidateRemoteProjectRequest>,
180        _: Arc<Client>,
181        cx: AsyncAppContext,
182    ) -> Result<proto::Ack> {
183        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
184        let path = std::path::Path::new(&expanded);
185        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
186
187        let path_exists = fs.is_dir(path).await;
188        if !path_exists {
189            return Err(anyhow::anyhow!(ErrorCode::RemoteProjectPathDoesNotExist))?;
190        }
191
192        Ok(proto::Ack {})
193    }
194
195    async fn handle_shutdown(
196        this: Model<Self>,
197        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
198        _: Arc<Client>,
199        mut cx: AsyncAppContext,
200    ) -> Result<()> {
201        this.update(&mut cx, |this, cx| {
202            this.remote_shutdown = true;
203            cx.quit();
204        })
205    }
206
207    fn unshare_project(
208        &mut self,
209        remote_project_id: &RemoteProjectId,
210        cx: &mut ModelContext<Self>,
211    ) -> Result<()> {
212        if let Some(project) = self.projects.remove(remote_project_id) {
213            project.update(cx, |project, cx| project.unshare(cx))?;
214        }
215        Ok(())
216    }
217
218    async fn share_project(
219        this: Model<Self>,
220        remote_project: &proto::RemoteProject,
221        cx: &mut AsyncAppContext,
222    ) -> Result<()> {
223        let (client, project) = this.update(cx, |this, cx| {
224            let project = Project::local(
225                this.client.clone(),
226                this.app_state.node_runtime.clone(),
227                this.app_state.user_store.clone(),
228                this.app_state.languages.clone(),
229                this.app_state.fs.clone(),
230                cx,
231            );
232
233            (this.client.clone(), project)
234        })?;
235
236        let path = shellexpand::tilde(&remote_project.path).to_string();
237
238        project
239            .update(cx, |project, cx| {
240                project.find_or_create_local_worktree(&path, true, cx)
241            })?
242            .await?;
243
244        let worktrees =
245            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
246
247        let response = client
248            .request(proto::ShareRemoteProject {
249                remote_project_id: remote_project.id,
250                worktrees,
251            })
252            .await?;
253
254        let project_id = response.project_id;
255        project.update(cx, |project, cx| project.shared(project_id, cx))??;
256        this.update(cx, |this, _| {
257            this.projects
258                .insert(RemoteProjectId(remote_project.id), project);
259        })?;
260        Ok(())
261    }
262
263    async fn maintain_connection(
264        this: WeakModel<Self>,
265        client: Arc<Client>,
266        mut cx: AsyncAppContext,
267    ) -> Result<()> {
268        let mut client_status = client.status();
269
270        let _ = client_status.try_recv();
271        let current_status = *client_status.borrow();
272        if current_status.is_connected() {
273            // wait for first disconnect
274            client_status.recv().await;
275        }
276
277        loop {
278            let Some(current_status) = client_status.recv().await else {
279                return Ok(());
280            };
281            let Some(this) = this.upgrade() else {
282                return Ok(());
283            };
284
285            if !current_status.is_connected() {
286                continue;
287            }
288
289            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
290        }
291    }
292
293    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
294        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
295        let request = self.client.request(proto::ReconnectDevServer {
296            reshared_projects: self
297                .projects
298                .iter()
299                .flat_map(|(_, handle)| {
300                    let project = handle.read(cx);
301                    let project_id = project.remote_id()?;
302                    projects.insert(project_id, handle.clone());
303                    Some(proto::UpdateProject {
304                        project_id,
305                        worktrees: project.worktree_metadata_protos(cx),
306                    })
307                })
308                .collect(),
309        });
310        cx.spawn(|_, mut cx| async move {
311            let response = request.await?;
312
313            for reshared_project in response.reshared_projects {
314                if let Some(project) = projects.get(&reshared_project.id) {
315                    project.update(&mut cx, |project, cx| {
316                        project.reshared(reshared_project, cx).log_err();
317                    })?;
318                }
319            }
320            Ok(())
321        })
322    }
323}