store.rs

  1use std::path::PathBuf;
  2use std::sync::Arc;
  3use std::sync::atomic::AtomicBool;
  4
  5use anyhow::{Context as _, Result, anyhow};
  6use async_trait::async_trait;
  7use collections::HashMap;
  8use derive_more::{Deref, Display};
  9use futures::FutureExt;
 10use futures::future::{self, BoxFuture, Shared};
 11use fuzzy::StringMatchCandidate;
 12use gpui::{App, BackgroundExecutor, Task};
 13use heed::Database;
 14use heed::types::SerdeBincode;
 15use parking_lot::RwLock;
 16use serde::{Deserialize, Serialize};
 17use util::ResultExt;
 18
 19use crate::IndexedDocsRegistry;
 20
 21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 22pub struct ProviderId(pub Arc<str>);
 23
 24/// The name of a package.
 25#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 26pub struct PackageName(Arc<str>);
 27
 28impl From<&str> for PackageName {
 29    fn from(value: &str) -> Self {
 30        Self(value.into())
 31    }
 32}
 33
 34#[async_trait]
 35pub trait IndexedDocsProvider {
 36    /// Returns the ID of this provider.
 37    fn id(&self) -> ProviderId;
 38
 39    /// Returns the path to the database for this provider.
 40    fn database_path(&self) -> PathBuf;
 41
 42    /// Returns a list of packages as suggestions to be included in the search
 43    /// results.
 44    ///
 45    /// This can be used to provide completions for known packages (e.g., from the
 46    /// local project or a registry) before a package has been indexed.
 47    async fn suggest_packages(&self) -> Result<Vec<PackageName>>;
 48
 49    /// Indexes the package with the given name.
 50    async fn index(&self, package: PackageName, database: Arc<IndexedDocsDatabase>) -> Result<()>;
 51}
 52
 53/// A store for indexed docs.
 54pub struct IndexedDocsStore {
 55    executor: BackgroundExecutor,
 56    database_future:
 57        Shared<BoxFuture<'static, Result<Arc<IndexedDocsDatabase>, Arc<anyhow::Error>>>>,
 58    provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 59    indexing_tasks_by_package:
 60        RwLock<HashMap<PackageName, Shared<Task<Result<(), Arc<anyhow::Error>>>>>>,
 61    latest_errors_by_package: RwLock<HashMap<PackageName, Arc<str>>>,
 62}
 63
 64impl IndexedDocsStore {
 65    pub fn try_global(provider: ProviderId, cx: &App) -> Result<Arc<Self>> {
 66        let registry = IndexedDocsRegistry::global(cx);
 67        registry
 68            .get_provider_store(provider.clone())
 69            .with_context(|| format!("no indexed docs store found for {provider}"))
 70    }
 71
 72    pub fn new(
 73        provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 74        executor: BackgroundExecutor,
 75    ) -> Self {
 76        let database_future = executor
 77            .spawn({
 78                let executor = executor.clone();
 79                let database_path = provider.database_path();
 80                async move { IndexedDocsDatabase::new(database_path, executor) }
 81            })
 82            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
 83            .boxed()
 84            .shared();
 85
 86        Self {
 87            executor,
 88            database_future,
 89            provider,
 90            indexing_tasks_by_package: RwLock::new(HashMap::default()),
 91            latest_errors_by_package: RwLock::new(HashMap::default()),
 92        }
 93    }
 94
 95    pub fn latest_error_for_package(&self, package: &PackageName) -> Option<Arc<str>> {
 96        self.latest_errors_by_package.read().get(package).cloned()
 97    }
 98
 99    /// Returns whether the package with the given name is currently being indexed.
100    pub fn is_indexing(&self, package: &PackageName) -> bool {
101        self.indexing_tasks_by_package.read().contains_key(package)
102    }
103
104    pub async fn load(&self, key: String) -> Result<MarkdownDocs> {
105        self.database_future
106            .clone()
107            .await
108            .map_err(|err| anyhow!(err))?
109            .load(key)
110            .await
111    }
112
113    pub async fn load_many_by_prefix(&self, prefix: String) -> Result<Vec<(String, MarkdownDocs)>> {
114        self.database_future
115            .clone()
116            .await
117            .map_err(|err| anyhow!(err))?
118            .load_many_by_prefix(prefix)
119            .await
120    }
121
122    /// Returns whether any entries exist with the given prefix.
123    pub async fn any_with_prefix(&self, prefix: String) -> Result<bool> {
124        self.database_future
125            .clone()
126            .await
127            .map_err(|err| anyhow!(err))?
128            .any_with_prefix(prefix)
129            .await
130    }
131
132    pub fn suggest_packages(self: Arc<Self>) -> Task<Result<Vec<PackageName>>> {
133        let this = self.clone();
134        self.executor
135            .spawn(async move { this.provider.suggest_packages().await })
136    }
137
138    pub fn index(
139        self: Arc<Self>,
140        package: PackageName,
141    ) -> Shared<Task<Result<(), Arc<anyhow::Error>>>> {
142        if let Some(existing_task) = self.indexing_tasks_by_package.read().get(&package) {
143            return existing_task.clone();
144        }
145
146        let indexing_task = self
147            .executor
148            .spawn({
149                let this = self.clone();
150                let package = package.clone();
151                async move {
152                    let _finally = util::defer({
153                        let this = this.clone();
154                        let package = package.clone();
155                        move || {
156                            this.indexing_tasks_by_package.write().remove(&package);
157                        }
158                    });
159
160                    let index_task = {
161                        let package = package.clone();
162                        async {
163                            let database = this
164                                .database_future
165                                .clone()
166                                .await
167                                .map_err(|err| anyhow!(err))?;
168                            this.provider.index(package, database).await
169                        }
170                    };
171
172                    let result = index_task.await.map_err(Arc::new);
173                    match &result {
174                        Ok(_) => {
175                            this.latest_errors_by_package.write().remove(&package);
176                        }
177                        Err(err) => {
178                            this.latest_errors_by_package
179                                .write()
180                                .insert(package, err.to_string().into());
181                        }
182                    }
183
184                    result
185                }
186            })
187            .shared();
188
189        self.indexing_tasks_by_package
190            .write()
191            .insert(package, indexing_task.clone());
192
193        indexing_task
194    }
195
196    pub fn search(&self, query: String) -> Task<Vec<String>> {
197        let executor = self.executor.clone();
198        let database_future = self.database_future.clone();
199        self.executor.spawn(async move {
200            let Some(database) = database_future.await.map_err(|err| anyhow!(err)).log_err() else {
201                return Vec::new();
202            };
203
204            let Some(items) = database.keys().await.log_err() else {
205                return Vec::new();
206            };
207
208            let candidates = items
209                .iter()
210                .enumerate()
211                .map(|(ix, item_path)| StringMatchCandidate::new(ix, &item_path))
212                .collect::<Vec<_>>();
213
214            let matches = fuzzy::match_strings(
215                &candidates,
216                &query,
217                false,
218                true,
219                100,
220                &AtomicBool::default(),
221                executor,
222            )
223            .await;
224
225            matches
226                .into_iter()
227                .map(|mat| items[mat.candidate_id].clone())
228                .collect()
229        })
230    }
231}
232
233#[derive(Debug, PartialEq, Eq, Clone, Display, Serialize, Deserialize)]
234pub struct MarkdownDocs(pub String);
235
236pub struct IndexedDocsDatabase {
237    executor: BackgroundExecutor,
238    env: heed::Env,
239    entries: Database<SerdeBincode<String>, SerdeBincode<MarkdownDocs>>,
240}
241
242impl IndexedDocsDatabase {
243    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
244        std::fs::create_dir_all(&path)?;
245
246        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
247        let env = unsafe {
248            heed::EnvOpenOptions::new()
249                .map_size(ONE_GB_IN_BYTES)
250                .max_dbs(1)
251                .open(path)?
252        };
253
254        let mut txn = env.write_txn()?;
255        let entries = env.create_database(&mut txn, Some("rustdoc_entries"))?;
256        txn.commit()?;
257
258        Ok(Self {
259            executor,
260            env,
261            entries,
262        })
263    }
264
265    pub fn keys(&self) -> Task<Result<Vec<String>>> {
266        let env = self.env.clone();
267        let entries = self.entries;
268
269        self.executor.spawn(async move {
270            let txn = env.read_txn()?;
271            let mut iter = entries.iter(&txn)?;
272            let mut keys = Vec::new();
273            while let Some((key, _value)) = iter.next().transpose()? {
274                keys.push(key);
275            }
276
277            Ok(keys)
278        })
279    }
280
281    pub fn load(&self, key: String) -> Task<Result<MarkdownDocs>> {
282        let env = self.env.clone();
283        let entries = self.entries;
284
285        self.executor.spawn(async move {
286            let txn = env.read_txn()?;
287            entries
288                .get(&txn, &key)?
289                .with_context(|| format!("no docs found for {key}"))
290        })
291    }
292
293    pub fn load_many_by_prefix(&self, prefix: String) -> Task<Result<Vec<(String, MarkdownDocs)>>> {
294        let env = self.env.clone();
295        let entries = self.entries;
296
297        self.executor.spawn(async move {
298            let txn = env.read_txn()?;
299            let results = entries
300                .iter(&txn)?
301                .filter_map(|entry| {
302                    let (key, value) = entry.ok()?;
303                    if key.starts_with(&prefix) {
304                        Some((key, value))
305                    } else {
306                        None
307                    }
308                })
309                .collect::<Vec<_>>();
310
311            Ok(results)
312        })
313    }
314
315    /// Returns whether any entries exist with the given prefix.
316    pub fn any_with_prefix(&self, prefix: String) -> Task<Result<bool>> {
317        let env = self.env.clone();
318        let entries = self.entries;
319
320        self.executor.spawn(async move {
321            let txn = env.read_txn()?;
322            let any = entries
323                .iter(&txn)?
324                .any(|entry| entry.map_or(false, |(key, _value)| key.starts_with(&prefix)));
325            Ok(any)
326        })
327    }
328
329    pub fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
330        let env = self.env.clone();
331        let entries = self.entries;
332
333        self.executor.spawn(async move {
334            let mut txn = env.write_txn()?;
335            entries.put(&mut txn, &key, &MarkdownDocs(docs))?;
336            txn.commit()?;
337            Ok(())
338        })
339    }
340}
341
342impl extension::KeyValueStoreDelegate for IndexedDocsDatabase {
343    fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
344        IndexedDocsDatabase::insert(&self, key, docs)
345    }
346}