headless_project.rs

  1use anyhow::{anyhow, Result};
  2use fs::Fs;
  3use gpui::{AppContext, AsyncAppContext, Context, Model, ModelContext};
  4use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
  5use project::{
  6    buffer_store::{BufferStore, BufferStoreEvent},
  7    project_settings::SettingsObserver,
  8    search::SearchQuery,
  9    worktree_store::WorktreeStore,
 10    LspStore, LspStoreEvent, ProjectPath, WorktreeId,
 11};
 12use remote::SshSession;
 13use rpc::{
 14    proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
 15    AnyProtoClient, TypedEnvelope,
 16};
 17use smol::stream::StreamExt;
 18use std::{
 19    path::{Path, PathBuf},
 20    sync::{atomic::AtomicUsize, Arc},
 21};
 22use util::ResultExt;
 23use worktree::Worktree;
 24
 25pub struct HeadlessProject {
 26    pub fs: Arc<dyn Fs>,
 27    pub session: AnyProtoClient,
 28    pub worktree_store: Model<WorktreeStore>,
 29    pub buffer_store: Model<BufferStore>,
 30    pub lsp_store: Model<LspStore>,
 31    pub settings_observer: Model<SettingsObserver>,
 32    pub next_entry_id: Arc<AtomicUsize>,
 33    pub languages: Arc<LanguageRegistry>,
 34}
 35
 36impl HeadlessProject {
 37    pub fn init(cx: &mut AppContext) {
 38        settings::init(cx);
 39        language::init(cx);
 40        project::Project::init_settings(cx);
 41    }
 42
 43    pub fn new(session: Arc<SshSession>, fs: Arc<dyn Fs>, cx: &mut ModelContext<Self>) -> Self {
 44        let mut languages = LanguageRegistry::new(cx.background_executor().clone());
 45        languages
 46            .set_language_server_download_dir(PathBuf::from("/Users/conrad/what-could-go-wrong"));
 47
 48        let languages = Arc::new(languages);
 49
 50        let worktree_store = cx.new_model(|_| WorktreeStore::new(true, fs.clone()));
 51        let buffer_store = cx.new_model(|cx| {
 52            let mut buffer_store =
 53                BufferStore::new(worktree_store.clone(), Some(SSH_PROJECT_ID), cx);
 54            buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 55            buffer_store
 56        });
 57        let settings_observer = cx.new_model(|cx| {
 58            let mut observer = SettingsObserver::new_local(fs.clone(), worktree_store.clone(), cx);
 59            observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 60            observer
 61        });
 62        let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
 63        let lsp_store = cx.new_model(|cx| {
 64            let mut lsp_store = LspStore::new_local(
 65                buffer_store.clone(),
 66                worktree_store.clone(),
 67                environment,
 68                languages.clone(),
 69                None,
 70                fs.clone(),
 71                cx,
 72            );
 73            lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 74            lsp_store
 75        });
 76
 77        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 78
 79        cx.subscribe(
 80            &buffer_store,
 81            |_this, _buffer_store, event, cx| match event {
 82                BufferStoreEvent::BufferAdded(buffer) => {
 83                    cx.subscribe(buffer, Self::on_buffer_event).detach();
 84                }
 85                _ => {}
 86            },
 87        )
 88        .detach();
 89
 90        let client: AnyProtoClient = session.clone().into();
 91
 92        session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
 93        session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
 94        session.subscribe_to_entity(SSH_PROJECT_ID, &cx.handle());
 95        session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
 96        session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
 97
 98        client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory);
 99
100        client.add_model_request_handler(Self::handle_add_worktree);
101        client.add_model_request_handler(Self::handle_open_buffer_by_path);
102        client.add_model_request_handler(Self::handle_find_search_candidates);
103
104        client.add_model_request_handler(BufferStore::handle_update_buffer);
105        client.add_model_message_handler(BufferStore::handle_close_buffer);
106
107        client.add_model_request_handler(LspStore::handle_create_language_server);
108        client.add_model_request_handler(LspStore::handle_which_command);
109        client.add_model_request_handler(LspStore::handle_shell_env);
110        client.add_model_request_handler(LspStore::handle_try_exec);
111        client.add_model_request_handler(LspStore::handle_read_text_file);
112
113        BufferStore::init(&client);
114        WorktreeStore::init(&client);
115        SettingsObserver::init(&client);
116        LspStore::init(&client);
117
118        HeadlessProject {
119            session: client,
120            settings_observer,
121            fs,
122            worktree_store,
123            buffer_store,
124            lsp_store,
125            next_entry_id: Default::default(),
126            languages,
127        }
128    }
129
130    fn on_buffer_event(
131        &mut self,
132        buffer: Model<Buffer>,
133        event: &BufferEvent,
134        cx: &mut ModelContext<Self>,
135    ) {
136        match event {
137            BufferEvent::Operation(op) => cx
138                .background_executor()
139                .spawn(self.session.request(proto::UpdateBuffer {
140                    project_id: SSH_PROJECT_ID,
141                    buffer_id: buffer.read(cx).remote_id().to_proto(),
142                    operations: vec![serialize_operation(op)],
143                }))
144                .detach(),
145            _ => {}
146        }
147    }
148
149    fn on_lsp_store_event(
150        &mut self,
151        _lsp_store: Model<LspStore>,
152        event: &LspStoreEvent,
153        _cx: &mut ModelContext<Self>,
154    ) {
155        match event {
156            LspStoreEvent::LanguageServerUpdate {
157                language_server_id,
158                message,
159            } => {
160                self.session
161                    .send(proto::UpdateLanguageServer {
162                        project_id: SSH_PROJECT_ID,
163                        language_server_id: language_server_id.to_proto(),
164                        variant: Some(message.clone()),
165                    })
166                    .log_err();
167            }
168            _ => {}
169        }
170    }
171
172    pub async fn handle_add_worktree(
173        this: Model<Self>,
174        message: TypedEnvelope<proto::AddWorktree>,
175        mut cx: AsyncAppContext,
176    ) -> Result<proto::AddWorktreeResponse> {
177        let path = shellexpand::tilde(&message.payload.path).to_string();
178        let worktree = this
179            .update(&mut cx.clone(), |this, _| {
180                Worktree::local(
181                    Path::new(&path),
182                    true,
183                    this.fs.clone(),
184                    this.next_entry_id.clone(),
185                    &mut cx,
186                )
187            })?
188            .await?;
189
190        this.update(&mut cx, |this, cx| {
191            let session = this.session.clone();
192            this.worktree_store.update(cx, |worktree_store, cx| {
193                worktree_store.add(&worktree, cx);
194            });
195            worktree.update(cx, |worktree, cx| {
196                worktree.observe_updates(0, cx, move |update| {
197                    session.send(update).ok();
198                    futures::future::ready(true)
199                });
200                proto::AddWorktreeResponse {
201                    worktree_id: worktree.id().to_proto(),
202                }
203            })
204        })
205    }
206
207    pub async fn handle_open_buffer_by_path(
208        this: Model<Self>,
209        message: TypedEnvelope<proto::OpenBufferByPath>,
210        mut cx: AsyncAppContext,
211    ) -> Result<proto::OpenBufferResponse> {
212        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
213        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
214            let buffer_store = this.buffer_store.clone();
215            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
216                buffer_store.open_buffer(
217                    ProjectPath {
218                        worktree_id,
219                        path: PathBuf::from(message.payload.path).into(),
220                    },
221                    cx,
222                )
223            });
224            anyhow::Ok((buffer_store, buffer))
225        })??;
226
227        let buffer = buffer.await?;
228        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
229        buffer_store.update(&mut cx, |buffer_store, cx| {
230            buffer_store
231                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
232                .detach_and_log_err(cx);
233        })?;
234
235        Ok(proto::OpenBufferResponse {
236            buffer_id: buffer_id.to_proto(),
237        })
238    }
239
240    pub async fn handle_find_search_candidates(
241        this: Model<Self>,
242        envelope: TypedEnvelope<proto::FindSearchCandidates>,
243        mut cx: AsyncAppContext,
244    ) -> Result<proto::FindSearchCandidatesResponse> {
245        let message = envelope.payload;
246        let query = SearchQuery::from_proto(
247            message
248                .query
249                .ok_or_else(|| anyhow!("missing query field"))?,
250        )?;
251        let mut results = this.update(&mut cx, |this, cx| {
252            this.buffer_store.update(cx, |buffer_store, cx| {
253                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
254            })
255        })?;
256
257        let mut response = proto::FindSearchCandidatesResponse {
258            buffer_ids: Vec::new(),
259        };
260
261        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
262
263        while let Some(buffer) = results.next().await {
264            let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
265            response.buffer_ids.push(buffer_id.to_proto());
266            buffer_store
267                .update(&mut cx, |buffer_store, cx| {
268                    buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
269                })?
270                .await?;
271        }
272
273        Ok(response)
274    }
275
276    pub async fn handle_list_remote_directory(
277        this: Model<Self>,
278        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
279        cx: AsyncAppContext,
280    ) -> Result<proto::ListRemoteDirectoryResponse> {
281        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
282        let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
283
284        let mut entries = Vec::new();
285        let mut response = fs.read_dir(Path::new(&expanded)).await?;
286        while let Some(path) = response.next().await {
287            if let Some(file_name) = path?.file_name() {
288                entries.push(file_name.to_string_lossy().to_string());
289            }
290        }
291        Ok(proto::ListRemoteDirectoryResponse { entries })
292    }
293}