headless_project.rs

  1use anyhow::{anyhow, Result};
  2use fs::Fs;
  3use gpui::{AppContext, AsyncAppContext, Context, Model, ModelContext};
  4use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
  5use project::{
  6    buffer_store::{BufferStore, BufferStoreEvent},
  7    project_settings::SettingsObserver,
  8    search::SearchQuery,
  9    worktree_store::WorktreeStore,
 10    LspStore, LspStoreEvent, ProjectPath, WorktreeId,
 11};
 12use remote::SshSession;
 13use rpc::{
 14    proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
 15    AnyProtoClient, TypedEnvelope,
 16};
 17use smol::stream::StreamExt;
 18use std::{
 19    path::{Path, PathBuf},
 20    sync::{atomic::AtomicUsize, Arc},
 21};
 22use util::ResultExt;
 23use worktree::Worktree;
 24
 25pub struct HeadlessProject {
 26    pub fs: Arc<dyn Fs>,
 27    pub session: AnyProtoClient,
 28    pub worktree_store: Model<WorktreeStore>,
 29    pub buffer_store: Model<BufferStore>,
 30    pub lsp_store: Model<LspStore>,
 31    pub settings_observer: Model<SettingsObserver>,
 32    pub next_entry_id: Arc<AtomicUsize>,
 33    pub languages: Arc<LanguageRegistry>,
 34}
 35
 36impl HeadlessProject {
 37    pub fn init(cx: &mut AppContext) {
 38        settings::init(cx);
 39        language::init(cx);
 40        project::Project::init_settings(cx);
 41    }
 42
 43    pub fn new(session: Arc<SshSession>, fs: Arc<dyn Fs>, cx: &mut ModelContext<Self>) -> Self {
 44        let mut languages = LanguageRegistry::new(cx.background_executor().clone());
 45        languages
 46            .set_language_server_download_dir(PathBuf::from("/Users/conrad/what-could-go-wrong"));
 47
 48        let languages = Arc::new(languages);
 49
 50        let worktree_store = cx.new_model(|_| WorktreeStore::new(true, fs.clone()));
 51        let buffer_store = cx.new_model(|cx| {
 52            let mut buffer_store =
 53                BufferStore::new(worktree_store.clone(), Some(SSH_PROJECT_ID), cx);
 54            buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 55            buffer_store
 56        });
 57        let settings_observer = cx.new_model(|cx| {
 58            let mut observer = SettingsObserver::new_local(fs.clone(), worktree_store.clone(), cx);
 59            observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 60            observer
 61        });
 62        let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
 63        let lsp_store = cx.new_model(|cx| {
 64            let mut lsp_store = LspStore::new_local(
 65                buffer_store.clone(),
 66                worktree_store.clone(),
 67                environment,
 68                languages.clone(),
 69                None,
 70                fs.clone(),
 71                cx,
 72            );
 73            lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 74            lsp_store
 75        });
 76
 77        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 78
 79        cx.subscribe(
 80            &buffer_store,
 81            |_this, _buffer_store, event, cx| match event {
 82                BufferStoreEvent::BufferAdded(buffer) => {
 83                    cx.subscribe(buffer, Self::on_buffer_event).detach();
 84                }
 85                _ => {}
 86            },
 87        )
 88        .detach();
 89
 90        let client: AnyProtoClient = session.clone().into();
 91
 92        session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
 93        session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
 94        session.subscribe_to_entity(SSH_PROJECT_ID, &cx.handle());
 95        session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
 96        session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
 97
 98        client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory);
 99
100        client.add_model_request_handler(Self::handle_add_worktree);
101        client.add_model_request_handler(Self::handle_open_buffer_by_path);
102        client.add_model_request_handler(Self::handle_find_search_candidates);
103
104        client.add_model_request_handler(BufferStore::handle_update_buffer);
105        client.add_model_message_handler(BufferStore::handle_close_buffer);
106
107        client.add_model_request_handler(LspStore::handle_create_language_server);
108        client.add_model_request_handler(LspStore::handle_which_command);
109        client.add_model_request_handler(LspStore::handle_shell_env);
110
111        BufferStore::init(&client);
112        WorktreeStore::init(&client);
113        SettingsObserver::init(&client);
114        LspStore::init(&client);
115
116        HeadlessProject {
117            session: client,
118            settings_observer,
119            fs,
120            worktree_store,
121            buffer_store,
122            lsp_store,
123            next_entry_id: Default::default(),
124            languages,
125        }
126    }
127
128    fn on_buffer_event(
129        &mut self,
130        buffer: Model<Buffer>,
131        event: &BufferEvent,
132        cx: &mut ModelContext<Self>,
133    ) {
134        match event {
135            BufferEvent::Operation(op) => cx
136                .background_executor()
137                .spawn(self.session.request(proto::UpdateBuffer {
138                    project_id: SSH_PROJECT_ID,
139                    buffer_id: buffer.read(cx).remote_id().to_proto(),
140                    operations: vec![serialize_operation(op)],
141                }))
142                .detach(),
143            _ => {}
144        }
145    }
146
147    fn on_lsp_store_event(
148        &mut self,
149        _lsp_store: Model<LspStore>,
150        event: &LspStoreEvent,
151        _cx: &mut ModelContext<Self>,
152    ) {
153        match event {
154            LspStoreEvent::LanguageServerUpdate {
155                language_server_id,
156                message,
157            } => {
158                self.session
159                    .send(proto::UpdateLanguageServer {
160                        project_id: SSH_PROJECT_ID,
161                        language_server_id: language_server_id.to_proto(),
162                        variant: Some(message.clone()),
163                    })
164                    .log_err();
165            }
166            _ => {}
167        }
168    }
169
170    pub async fn handle_add_worktree(
171        this: Model<Self>,
172        message: TypedEnvelope<proto::AddWorktree>,
173        mut cx: AsyncAppContext,
174    ) -> Result<proto::AddWorktreeResponse> {
175        let path = shellexpand::tilde(&message.payload.path).to_string();
176        let worktree = this
177            .update(&mut cx.clone(), |this, _| {
178                Worktree::local(
179                    Path::new(&path),
180                    true,
181                    this.fs.clone(),
182                    this.next_entry_id.clone(),
183                    &mut cx,
184                )
185            })?
186            .await?;
187
188        this.update(&mut cx, |this, cx| {
189            let session = this.session.clone();
190            this.worktree_store.update(cx, |worktree_store, cx| {
191                worktree_store.add(&worktree, cx);
192            });
193            worktree.update(cx, |worktree, cx| {
194                worktree.observe_updates(0, cx, move |update| {
195                    session.send(update).ok();
196                    futures::future::ready(true)
197                });
198                proto::AddWorktreeResponse {
199                    worktree_id: worktree.id().to_proto(),
200                }
201            })
202        })
203    }
204
205    pub async fn handle_open_buffer_by_path(
206        this: Model<Self>,
207        message: TypedEnvelope<proto::OpenBufferByPath>,
208        mut cx: AsyncAppContext,
209    ) -> Result<proto::OpenBufferResponse> {
210        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
211        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
212            let buffer_store = this.buffer_store.clone();
213            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
214                buffer_store.open_buffer(
215                    ProjectPath {
216                        worktree_id,
217                        path: PathBuf::from(message.payload.path).into(),
218                    },
219                    cx,
220                )
221            });
222            anyhow::Ok((buffer_store, buffer))
223        })??;
224
225        let buffer = buffer.await?;
226        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
227        buffer_store.update(&mut cx, |buffer_store, cx| {
228            buffer_store
229                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
230                .detach_and_log_err(cx);
231        })?;
232
233        Ok(proto::OpenBufferResponse {
234            buffer_id: buffer_id.to_proto(),
235        })
236    }
237
238    pub async fn handle_find_search_candidates(
239        this: Model<Self>,
240        envelope: TypedEnvelope<proto::FindSearchCandidates>,
241        mut cx: AsyncAppContext,
242    ) -> Result<proto::FindSearchCandidatesResponse> {
243        let message = envelope.payload;
244        let query = SearchQuery::from_proto(
245            message
246                .query
247                .ok_or_else(|| anyhow!("missing query field"))?,
248        )?;
249        let mut results = this.update(&mut cx, |this, cx| {
250            this.buffer_store.update(cx, |buffer_store, cx| {
251                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
252            })
253        })?;
254
255        let mut response = proto::FindSearchCandidatesResponse {
256            buffer_ids: Vec::new(),
257        };
258
259        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
260
261        while let Some(buffer) = results.next().await {
262            let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
263            response.buffer_ids.push(buffer_id.to_proto());
264            buffer_store
265                .update(&mut cx, |buffer_store, cx| {
266                    buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
267                })?
268                .await?;
269        }
270
271        Ok(response)
272    }
273
274    pub async fn handle_list_remote_directory(
275        this: Model<Self>,
276        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
277        cx: AsyncAppContext,
278    ) -> Result<proto::ListRemoteDirectoryResponse> {
279        let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
280        let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
281
282        let mut entries = Vec::new();
283        let mut response = fs.read_dir(Path::new(&expanded)).await?;
284        while let Some(path) = response.next().await {
285            if let Some(file_name) = path?.file_name() {
286                entries.push(file_name.to_string_lossy().to_string());
287            }
288        }
289        Ok(proto::ListRemoteDirectoryResponse { entries })
290    }
291}