headless.rs

  1use anyhow::Result;
  2use client::{user::UserStore, Client, ClientSettings, RemoteProjectId};
  3use fs::Fs;
  4use futures::Future;
  5use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
  6use language::LanguageRegistry;
  7use node_runtime::NodeRuntime;
  8use postage::stream::Stream;
  9use project::Project;
 10use rpc::{proto, TypedEnvelope};
 11use settings::Settings;
 12use std::{collections::HashMap, sync::Arc};
 13use util::{ResultExt, TryFutureExt};
 14
 15pub struct DevServer {
 16    client: Arc<Client>,
 17    app_state: AppState,
 18    projects: HashMap<RemoteProjectId, Model<Project>>,
 19    _subscriptions: Vec<client::Subscription>,
 20    _maintain_connection: Task<Option<()>>,
 21}
 22
 23pub struct AppState {
 24    pub node_runtime: Arc<dyn NodeRuntime>,
 25    pub user_store: Model<UserStore>,
 26    pub languages: Arc<LanguageRegistry>,
 27    pub fs: Arc<dyn Fs>,
 28}
 29
 30struct GlobalDevServer(Model<DevServer>);
 31
 32impl Global for GlobalDevServer {}
 33
 34pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) {
 35    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 36    cx.set_global(GlobalDevServer(dev_server.clone()));
 37
 38    // Set up a handler when the dev server is shut down by the user pressing Ctrl-C
 39    let (tx, rx) = futures::channel::oneshot::channel();
 40    set_ctrlc_handler(move || tx.send(()).log_err().unwrap()).log_err();
 41
 42    cx.spawn(|cx| async move {
 43        rx.await.log_err();
 44        log::info!("Received interrupt signal");
 45        cx.update(|cx| cx.quit()).log_err();
 46    })
 47    .detach();
 48
 49    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 50    cx.spawn(|cx| async move {
 51        match client.authenticate_and_connect(false, &cx).await {
 52            Ok(_) => {
 53                log::info!("Connected to {}", server_url);
 54            }
 55            Err(e) => {
 56                log::error!("Error connecting to {}: {}", server_url, e);
 57                cx.update(|cx| cx.quit()).log_err();
 58            }
 59        }
 60    })
 61    .detach();
 62}
 63
 64fn set_ctrlc_handler<F>(f: F) -> Result<(), ctrlc::Error>
 65where
 66    F: FnOnce() + 'static + Send,
 67{
 68    let f = std::sync::Mutex::new(Some(f));
 69    ctrlc::set_handler(move || {
 70        if let Ok(mut guard) = f.lock() {
 71            let f = guard.take().expect("f can only be taken once");
 72            f();
 73        }
 74    })
 75}
 76
 77impl DevServer {
 78    pub fn global(cx: &AppContext) -> Model<DevServer> {
 79        cx.global::<GlobalDevServer>().0.clone()
 80    }
 81
 82    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 83        cx.on_app_quit(Self::app_will_quit).detach();
 84
 85        let maintain_connection = cx.spawn({
 86            let client = client.clone();
 87            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 88        });
 89
 90        DevServer {
 91            _subscriptions: vec![
 92                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions)
 93            ],
 94            _maintain_connection: maintain_connection,
 95            projects: Default::default(),
 96            app_state,
 97            client,
 98        }
 99    }
100
101    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
102        let request = self.client.request(proto::ShutdownDevServer {});
103        async move {
104            request.await.log_err();
105        }
106    }
107
108    async fn handle_dev_server_instructions(
109        this: Model<Self>,
110        envelope: TypedEnvelope<proto::DevServerInstructions>,
111        _: Arc<Client>,
112        mut cx: AsyncAppContext,
113    ) -> Result<()> {
114        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
115            let removed_projects = this
116                .projects
117                .keys()
118                .filter(|remote_project_id| {
119                    !envelope
120                        .payload
121                        .projects
122                        .iter()
123                        .any(|p| p.id == remote_project_id.0)
124                })
125                .cloned()
126                .collect::<Vec<_>>();
127
128            let added_projects = envelope
129                .payload
130                .projects
131                .into_iter()
132                .filter(|project| !this.projects.contains_key(&RemoteProjectId(project.id)))
133                .collect::<Vec<_>>();
134
135            (added_projects, removed_projects)
136        })?;
137
138        for remote_project in added_projects {
139            DevServer::share_project(this.clone(), &remote_project, &mut cx).await?;
140        }
141
142        this.update(&mut cx, |this, cx| {
143            for old_project_id in &removed_projects_ids {
144                this.unshare_project(old_project_id, cx)?;
145            }
146            Ok::<(), anyhow::Error>(())
147        })??;
148        Ok(())
149    }
150
151    fn unshare_project(
152        &mut self,
153        remote_project_id: &RemoteProjectId,
154        cx: &mut ModelContext<Self>,
155    ) -> Result<()> {
156        if let Some(project) = self.projects.remove(remote_project_id) {
157            project.update(cx, |project, cx| project.unshare(cx))?;
158        }
159        Ok(())
160    }
161
162    async fn share_project(
163        this: Model<Self>,
164        remote_project: &proto::RemoteProject,
165        cx: &mut AsyncAppContext,
166    ) -> Result<()> {
167        let (client, project) = this.update(cx, |this, cx| {
168            let project = Project::local(
169                this.client.clone(),
170                this.app_state.node_runtime.clone(),
171                this.app_state.user_store.clone(),
172                this.app_state.languages.clone(),
173                this.app_state.fs.clone(),
174                cx,
175            );
176
177            (this.client.clone(), project)
178        })?;
179
180        project
181            .update(cx, |project, cx| {
182                project.find_or_create_local_worktree(&remote_project.path, true, cx)
183            })?
184            .await?;
185
186        let worktrees =
187            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
188
189        let response = client
190            .request(proto::ShareRemoteProject {
191                remote_project_id: remote_project.id,
192                worktrees,
193            })
194            .await?;
195
196        let project_id = response.project_id;
197        project.update(cx, |project, cx| project.shared(project_id, cx))??;
198        this.update(cx, |this, _| {
199            this.projects
200                .insert(RemoteProjectId(remote_project.id), project);
201        })?;
202        Ok(())
203    }
204
205    async fn maintain_connection(
206        this: WeakModel<Self>,
207        client: Arc<Client>,
208        mut cx: AsyncAppContext,
209    ) -> Result<()> {
210        let mut client_status = client.status();
211
212        let _ = client_status.try_recv();
213        let current_status = *client_status.borrow();
214        if current_status.is_connected() {
215            // wait for first disconnect
216            client_status.recv().await;
217        }
218
219        loop {
220            let Some(current_status) = client_status.recv().await else {
221                return Ok(());
222            };
223            let Some(this) = this.upgrade() else {
224                return Ok(());
225            };
226
227            if !current_status.is_connected() {
228                continue;
229            }
230
231            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
232        }
233    }
234
235    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
236        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
237        let request = self.client.request(proto::ReconnectDevServer {
238            reshared_projects: self
239                .projects
240                .iter()
241                .flat_map(|(_, handle)| {
242                    let project = handle.read(cx);
243                    let project_id = project.remote_id()?;
244                    projects.insert(project_id, handle.clone());
245                    Some(proto::UpdateProject {
246                        project_id,
247                        worktrees: project.worktree_metadata_protos(cx),
248                    })
249                })
250                .collect(),
251        });
252        cx.spawn(|_, mut cx| async move {
253            let response = request.await?;
254
255            for reshared_project in response.reshared_projects {
256                if let Some(project) = projects.get(&reshared_project.id) {
257                    project.update(&mut cx, |project, cx| {
258                        project.reshared(reshared_project, cx).log_err();
259                    })?;
260                }
261            }
262            Ok(())
263        })
264    }
265}