headless_project.rs

  1use anyhow::{anyhow, Result};
  2use fs::Fs;
  3use gpui::{AppContext, AsyncAppContext, Context, Model, ModelContext};
  4use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
  5use node_runtime::NodeRuntime;
  6use project::{
  7    buffer_store::{BufferStore, BufferStoreEvent},
  8    project_settings::SettingsObserver,
  9    search::SearchQuery,
 10    worktree_store::WorktreeStore,
 11    LspStore, LspStoreEvent, PrettierStore, ProjectPath, WorktreeId,
 12};
 13use remote::ssh_session::ChannelClient;
 14use rpc::{
 15    proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
 16    AnyProtoClient, TypedEnvelope,
 17};
 18use smol::stream::StreamExt;
 19use std::{
 20    path::{Path, PathBuf},
 21    sync::{atomic::AtomicUsize, Arc},
 22};
 23use util::ResultExt;
 24use worktree::Worktree;
 25
 26pub struct HeadlessProject {
 27    pub fs: Arc<dyn Fs>,
 28    pub session: AnyProtoClient,
 29    pub worktree_store: Model<WorktreeStore>,
 30    pub buffer_store: Model<BufferStore>,
 31    pub lsp_store: Model<LspStore>,
 32    pub settings_observer: Model<SettingsObserver>,
 33    pub next_entry_id: Arc<AtomicUsize>,
 34    pub languages: Arc<LanguageRegistry>,
 35}
 36
 37impl HeadlessProject {
 38    pub fn init(cx: &mut AppContext) {
 39        settings::init(cx);
 40        language::init(cx);
 41        project::Project::init_settings(cx);
 42    }
 43
 44    pub fn new(session: Arc<ChannelClient>, fs: Arc<dyn Fs>, cx: &mut ModelContext<Self>) -> Self {
 45        let languages = Arc::new(LanguageRegistry::new(cx.background_executor().clone()));
 46
 47        let node_runtime = NodeRuntime::unavailable();
 48
 49        languages::init(languages.clone(), node_runtime.clone(), cx);
 50
 51        let worktree_store = cx.new_model(|cx| {
 52            let mut store = WorktreeStore::local(true, fs.clone());
 53            store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 54            store
 55        });
 56        let buffer_store = cx.new_model(|cx| {
 57            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 58            buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 59            buffer_store
 60        });
 61        let prettier_store = cx.new_model(|cx| {
 62            PrettierStore::new(
 63                node_runtime,
 64                fs.clone(),
 65                languages.clone(),
 66                worktree_store.clone(),
 67                cx,
 68            )
 69        });
 70
 71        let settings_observer = cx.new_model(|cx| {
 72            let mut observer = SettingsObserver::new_local(fs.clone(), worktree_store.clone(), cx);
 73            observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 74            observer
 75        });
 76        let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
 77        let lsp_store = cx.new_model(|cx| {
 78            let mut lsp_store = LspStore::new_local(
 79                buffer_store.clone(),
 80                worktree_store.clone(),
 81                prettier_store.clone(),
 82                environment,
 83                languages.clone(),
 84                None,
 85                fs.clone(),
 86                cx,
 87            );
 88            lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 89            lsp_store
 90        });
 91
 92        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 93
 94        cx.subscribe(
 95            &buffer_store,
 96            |_this, _buffer_store, event, cx| match event {
 97                BufferStoreEvent::BufferAdded(buffer) => {
 98                    cx.subscribe(buffer, Self::on_buffer_event).detach();
 99                }
100                _ => {}
101            },
102        )
103        .detach();
104
105        let client: AnyProtoClient = session.clone().into();
106
107        session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
108        session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
109        session.subscribe_to_entity(SSH_PROJECT_ID, &cx.handle());
110        session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
111        session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
112
113        client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory);
114        client.add_request_handler(cx.weak_model(), Self::handle_check_file_exists);
115        client.add_request_handler(cx.weak_model(), Self::handle_shutdown_remote_server);
116        client.add_request_handler(cx.weak_model(), Self::handle_ping);
117
118        client.add_model_request_handler(Self::handle_add_worktree);
119        client.add_model_request_handler(Self::handle_open_buffer_by_path);
120        client.add_model_request_handler(Self::handle_find_search_candidates);
121
122        client.add_model_request_handler(BufferStore::handle_update_buffer);
123        client.add_model_message_handler(BufferStore::handle_close_buffer);
124
125        BufferStore::init(&client);
126        WorktreeStore::init(&client);
127        SettingsObserver::init(&client);
128        LspStore::init(&client);
129
130        HeadlessProject {
131            session: client,
132            settings_observer,
133            fs,
134            worktree_store,
135            buffer_store,
136            lsp_store,
137            next_entry_id: Default::default(),
138            languages,
139        }
140    }
141
142    fn on_buffer_event(
143        &mut self,
144        buffer: Model<Buffer>,
145        event: &BufferEvent,
146        cx: &mut ModelContext<Self>,
147    ) {
148        match event {
149            BufferEvent::Operation {
150                operation,
151                is_local: true,
152            } => cx
153                .background_executor()
154                .spawn(self.session.request(proto::UpdateBuffer {
155                    project_id: SSH_PROJECT_ID,
156                    buffer_id: buffer.read(cx).remote_id().to_proto(),
157                    operations: vec![serialize_operation(operation)],
158                }))
159                .detach(),
160            _ => {}
161        }
162    }
163
164    fn on_lsp_store_event(
165        &mut self,
166        _lsp_store: Model<LspStore>,
167        event: &LspStoreEvent,
168        _cx: &mut ModelContext<Self>,
169    ) {
170        match event {
171            LspStoreEvent::LanguageServerUpdate {
172                language_server_id,
173                message,
174            } => {
175                self.session
176                    .send(proto::UpdateLanguageServer {
177                        project_id: SSH_PROJECT_ID,
178                        language_server_id: language_server_id.to_proto(),
179                        variant: Some(message.clone()),
180                    })
181                    .log_err();
182            }
183            _ => {}
184        }
185    }
186
187    pub async fn handle_add_worktree(
188        this: Model<Self>,
189        message: TypedEnvelope<proto::AddWorktree>,
190        mut cx: AsyncAppContext,
191    ) -> Result<proto::AddWorktreeResponse> {
192        use client::ErrorCodeExt;
193        let path = shellexpand::tilde(&message.payload.path).to_string();
194
195        let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
196        let path = PathBuf::from(path);
197
198        let canonicalized = match fs.canonicalize(&path).await {
199            Ok(path) => path,
200            Err(e) => {
201                let mut parent = path
202                    .parent()
203                    .ok_or(e)
204                    .map_err(|_| anyhow!("{:?} does not exist", path))?;
205                if parent == Path::new("") {
206                    parent = util::paths::home_dir();
207                }
208                let parent = fs.canonicalize(parent).await.map_err(|_| {
209                    anyhow!(proto::ErrorCode::DevServerProjectPathDoesNotExist
210                        .with_tag("path", &path.to_string_lossy().as_ref()))
211                })?;
212                parent.join(path.file_name().unwrap())
213            }
214        };
215
216        let worktree = this
217            .update(&mut cx.clone(), |this, _| {
218                Worktree::local(
219                    Arc::from(canonicalized),
220                    true,
221                    this.fs.clone(),
222                    this.next_entry_id.clone(),
223                    &mut cx,
224                )
225            })?
226            .await?;
227
228        this.update(&mut cx, |this, cx| {
229            this.worktree_store.update(cx, |worktree_store, cx| {
230                worktree_store.add(&worktree, cx);
231            });
232            worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
233                worktree_id: worktree.id().to_proto(),
234            })
235        })
236    }
237
238    pub async fn handle_open_buffer_by_path(
239        this: Model<Self>,
240        message: TypedEnvelope<proto::OpenBufferByPath>,
241        mut cx: AsyncAppContext,
242    ) -> Result<proto::OpenBufferResponse> {
243        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
244        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
245            let buffer_store = this.buffer_store.clone();
246            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
247                buffer_store.open_buffer(
248                    ProjectPath {
249                        worktree_id,
250                        path: PathBuf::from(message.payload.path).into(),
251                    },
252                    cx,
253                )
254            });
255            anyhow::Ok((buffer_store, buffer))
256        })??;
257
258        let buffer = buffer.await?;
259        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
260        buffer_store.update(&mut cx, |buffer_store, cx| {
261            buffer_store
262                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
263                .detach_and_log_err(cx);
264        })?;
265
266        Ok(proto::OpenBufferResponse {
267            buffer_id: buffer_id.to_proto(),
268        })
269    }
270
271    pub async fn handle_find_search_candidates(
272        this: Model<Self>,
273        envelope: TypedEnvelope<proto::FindSearchCandidates>,
274        mut cx: AsyncAppContext,
275    ) -> Result<proto::FindSearchCandidatesResponse> {
276        let message = envelope.payload;
277        let query = SearchQuery::from_proto(
278            message
279                .query
280                .ok_or_else(|| anyhow!("missing query field"))?,
281        )?;
282        let mut results = this.update(&mut cx, |this, cx| {
283            this.buffer_store.update(cx, |buffer_store, cx| {
284                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
285            })
286        })?;
287
288        let mut response = proto::FindSearchCandidatesResponse {
289            buffer_ids: Vec::new(),
290        };
291
292        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
293
294        while let Some(buffer) = results.next().await {
295            let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
296            response.buffer_ids.push(buffer_id.to_proto());
297            buffer_store
298                .update(&mut cx, |buffer_store, cx| {
299                    buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
300                })?
301                .await?;
302        }
303
304        Ok(response)
305    }
306
307    pub async fn handle_list_remote_directory(
308        this: Model<Self>,
309        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
310        cx: AsyncAppContext,
311    ) -> Result<proto::ListRemoteDirectoryResponse> {
312        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
313        let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
314
315        let mut entries = Vec::new();
316        let mut response = fs.read_dir(Path::new(&expanded)).await?;
317        while let Some(path) = response.next().await {
318            if let Some(file_name) = path?.file_name() {
319                entries.push(file_name.to_string_lossy().to_string());
320            }
321        }
322        Ok(proto::ListRemoteDirectoryResponse { entries })
323    }
324
325    pub async fn handle_check_file_exists(
326        this: Model<Self>,
327        envelope: TypedEnvelope<proto::CheckFileExists>,
328        cx: AsyncAppContext,
329    ) -> Result<proto::CheckFileExistsResponse> {
330        let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
331        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
332
333        let exists = fs.is_file(&PathBuf::from(expanded.clone())).await;
334
335        Ok(proto::CheckFileExistsResponse {
336            exists,
337            path: expanded,
338        })
339    }
340
341    pub async fn handle_shutdown_remote_server(
342        _this: Model<Self>,
343        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
344        cx: AsyncAppContext,
345    ) -> Result<proto::Ack> {
346        cx.spawn(|cx| async move {
347            cx.update(|cx| {
348                // TODO: This is a hack, because in a headless project, shutdown isn't executed
349                // when calling quit, but it should be.
350                cx.shutdown();
351                cx.quit();
352            })
353        })
354        .detach();
355
356        Ok(proto::Ack {})
357    }
358
359    pub async fn handle_ping(
360        _this: Model<Self>,
361        _envelope: TypedEnvelope<proto::Ping>,
362        _cx: AsyncAppContext,
363    ) -> Result<proto::Ack> {
364        log::debug!("Received ping from client");
365        Ok(proto::Ack {})
366    }
367}