store.rs

  1use std::path::PathBuf;
  2use std::sync::Arc;
  3use std::sync::atomic::AtomicBool;
  4
  5use anyhow::{Context as _, Result, anyhow};
  6use async_trait::async_trait;
  7use collections::HashMap;
  8use derive_more::{Deref, Display};
  9use futures::FutureExt;
 10use futures::future::{self, BoxFuture, Shared};
 11use fuzzy::StringMatchCandidate;
 12use gpui::{App, BackgroundExecutor, Task};
 13use heed::Database;
 14use heed::types::SerdeBincode;
 15use parking_lot::RwLock;
 16use serde::{Deserialize, Serialize};
 17use util::ResultExt;
 18
 19use crate::IndexedDocsRegistry;
 20
 21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 22pub struct ProviderId(pub Arc<str>);
 23
 24/// The name of a package.
 25#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 26pub struct PackageName(Arc<str>);
 27
 28impl From<&str> for PackageName {
 29    fn from(value: &str) -> Self {
 30        Self(value.into())
 31    }
 32}
 33
 34#[async_trait]
 35pub trait IndexedDocsProvider {
 36    /// Returns the ID of this provider.
 37    fn id(&self) -> ProviderId;
 38
 39    /// Returns the path to the database for this provider.
 40    fn database_path(&self) -> PathBuf;
 41
 42    /// Returns a list of packages as suggestions to be included in the search
 43    /// results.
 44    ///
 45    /// This can be used to provide completions for known packages (e.g., from the
 46    /// local project or a registry) before a package has been indexed.
 47    async fn suggest_packages(&self) -> Result<Vec<PackageName>>;
 48
 49    /// Indexes the package with the given name.
 50    async fn index(&self, package: PackageName, database: Arc<IndexedDocsDatabase>) -> Result<()>;
 51}
 52
 53/// A store for indexed docs.
 54pub struct IndexedDocsStore {
 55    executor: BackgroundExecutor,
 56    database_future:
 57        Shared<BoxFuture<'static, Result<Arc<IndexedDocsDatabase>, Arc<anyhow::Error>>>>,
 58    provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 59    indexing_tasks_by_package:
 60        RwLock<HashMap<PackageName, Shared<Task<Result<(), Arc<anyhow::Error>>>>>>,
 61    latest_errors_by_package: RwLock<HashMap<PackageName, Arc<str>>>,
 62}
 63
 64impl IndexedDocsStore {
 65    pub fn try_global(provider: ProviderId, cx: &App) -> Result<Arc<Self>> {
 66        let registry = IndexedDocsRegistry::global(cx);
 67        registry
 68            .get_provider_store(provider.clone())
 69            .with_context(|| format!("no indexed docs store found for {provider}"))
 70    }
 71
 72    pub fn new(
 73        provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 74        executor: BackgroundExecutor,
 75    ) -> Self {
 76        let database_future = executor
 77            .spawn({
 78                let executor = executor.clone();
 79                let database_path = provider.database_path();
 80                async move { IndexedDocsDatabase::new(database_path, executor) }
 81            })
 82            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
 83            .boxed()
 84            .shared();
 85
 86        Self {
 87            executor,
 88            database_future,
 89            provider,
 90            indexing_tasks_by_package: RwLock::new(HashMap::default()),
 91            latest_errors_by_package: RwLock::new(HashMap::default()),
 92        }
 93    }
 94
 95    pub fn latest_error_for_package(&self, package: &PackageName) -> Option<Arc<str>> {
 96        self.latest_errors_by_package.read().get(package).cloned()
 97    }
 98
 99    /// Returns whether the package with the given name is currently being indexed.
100    pub fn is_indexing(&self, package: &PackageName) -> bool {
101        self.indexing_tasks_by_package.read().contains_key(package)
102    }
103
104    pub async fn load(&self, key: String) -> Result<MarkdownDocs> {
105        self.database_future
106            .clone()
107            .await
108            .map_err(|err| anyhow!(err))?
109            .load(key)
110            .await
111    }
112
113    pub async fn load_many_by_prefix(&self, prefix: String) -> Result<Vec<(String, MarkdownDocs)>> {
114        self.database_future
115            .clone()
116            .await
117            .map_err(|err| anyhow!(err))?
118            .load_many_by_prefix(prefix)
119            .await
120    }
121
122    /// Returns whether any entries exist with the given prefix.
123    pub async fn any_with_prefix(&self, prefix: String) -> Result<bool> {
124        self.database_future
125            .clone()
126            .await
127            .map_err(|err| anyhow!(err))?
128            .any_with_prefix(prefix)
129            .await
130    }
131
132    pub fn suggest_packages(self: Arc<Self>) -> Task<Result<Vec<PackageName>>> {
133        let this = self.clone();
134        self.executor
135            .spawn(async move { this.provider.suggest_packages().await })
136    }
137
138    pub fn index(
139        self: Arc<Self>,
140        package: PackageName,
141    ) -> Shared<Task<Result<(), Arc<anyhow::Error>>>> {
142        if let Some(existing_task) = self.indexing_tasks_by_package.read().get(&package) {
143            return existing_task.clone();
144        }
145
146        let indexing_task = self
147            .executor
148            .spawn({
149                let this = self.clone();
150                let package = package.clone();
151                async move {
152                    let _finally = util::defer({
153                        let this = this.clone();
154                        let package = package.clone();
155                        move || {
156                            this.indexing_tasks_by_package.write().remove(&package);
157                        }
158                    });
159
160                    let index_task = {
161                        let package = package.clone();
162                        async {
163                            let database = this
164                                .database_future
165                                .clone()
166                                .await
167                                .map_err(|err| anyhow!(err))?;
168                            this.provider.index(package, database).await
169                        }
170                    };
171
172                    let result = index_task.await.map_err(Arc::new);
173                    match &result {
174                        Ok(_) => {
175                            this.latest_errors_by_package.write().remove(&package);
176                        }
177                        Err(err) => {
178                            this.latest_errors_by_package
179                                .write()
180                                .insert(package, err.to_string().into());
181                        }
182                    }
183
184                    result
185                }
186            })
187            .shared();
188
189        self.indexing_tasks_by_package
190            .write()
191            .insert(package, indexing_task.clone());
192
193        indexing_task
194    }
195
196    pub fn search(&self, query: String) -> Task<Vec<String>> {
197        let executor = self.executor.clone();
198        let database_future = self.database_future.clone();
199        self.executor.spawn(async move {
200            let Some(database) = database_future.await.map_err(|err| anyhow!(err)).log_err() else {
201                return Vec::new();
202            };
203
204            let Some(items) = database.keys().await.log_err() else {
205                return Vec::new();
206            };
207
208            let candidates = items
209                .iter()
210                .enumerate()
211                .map(|(ix, item_path)| StringMatchCandidate::new(ix, &item_path))
212                .collect::<Vec<_>>();
213
214            let matches = fuzzy::match_strings(
215                &candidates,
216                &query,
217                false,
218                100,
219                &AtomicBool::default(),
220                executor,
221            )
222            .await;
223
224            matches
225                .into_iter()
226                .map(|mat| items[mat.candidate_id].clone())
227                .collect()
228        })
229    }
230}
231
232#[derive(Debug, PartialEq, Eq, Clone, Display, Serialize, Deserialize)]
233pub struct MarkdownDocs(pub String);
234
235pub struct IndexedDocsDatabase {
236    executor: BackgroundExecutor,
237    env: heed::Env,
238    entries: Database<SerdeBincode<String>, SerdeBincode<MarkdownDocs>>,
239}
240
241impl IndexedDocsDatabase {
242    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
243        std::fs::create_dir_all(&path)?;
244
245        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
246        let env = unsafe {
247            heed::EnvOpenOptions::new()
248                .map_size(ONE_GB_IN_BYTES)
249                .max_dbs(1)
250                .open(path)?
251        };
252
253        let mut txn = env.write_txn()?;
254        let entries = env.create_database(&mut txn, Some("rustdoc_entries"))?;
255        txn.commit()?;
256
257        Ok(Self {
258            executor,
259            env,
260            entries,
261        })
262    }
263
264    pub fn keys(&self) -> Task<Result<Vec<String>>> {
265        let env = self.env.clone();
266        let entries = self.entries;
267
268        self.executor.spawn(async move {
269            let txn = env.read_txn()?;
270            let mut iter = entries.iter(&txn)?;
271            let mut keys = Vec::new();
272            while let Some((key, _value)) = iter.next().transpose()? {
273                keys.push(key);
274            }
275
276            Ok(keys)
277        })
278    }
279
280    pub fn load(&self, key: String) -> Task<Result<MarkdownDocs>> {
281        let env = self.env.clone();
282        let entries = self.entries;
283
284        self.executor.spawn(async move {
285            let txn = env.read_txn()?;
286            entries
287                .get(&txn, &key)?
288                .with_context(|| format!("no docs found for {key}"))
289        })
290    }
291
292    pub fn load_many_by_prefix(&self, prefix: String) -> Task<Result<Vec<(String, MarkdownDocs)>>> {
293        let env = self.env.clone();
294        let entries = self.entries;
295
296        self.executor.spawn(async move {
297            let txn = env.read_txn()?;
298            let results = entries
299                .iter(&txn)?
300                .filter_map(|entry| {
301                    let (key, value) = entry.ok()?;
302                    if key.starts_with(&prefix) {
303                        Some((key, value))
304                    } else {
305                        None
306                    }
307                })
308                .collect::<Vec<_>>();
309
310            Ok(results)
311        })
312    }
313
314    /// Returns whether any entries exist with the given prefix.
315    pub fn any_with_prefix(&self, prefix: String) -> Task<Result<bool>> {
316        let env = self.env.clone();
317        let entries = self.entries;
318
319        self.executor.spawn(async move {
320            let txn = env.read_txn()?;
321            let any = entries
322                .iter(&txn)?
323                .any(|entry| entry.map_or(false, |(key, _value)| key.starts_with(&prefix)));
324            Ok(any)
325        })
326    }
327
328    pub fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
329        let env = self.env.clone();
330        let entries = self.entries;
331
332        self.executor.spawn(async move {
333            let mut txn = env.write_txn()?;
334            entries.put(&mut txn, &key, &MarkdownDocs(docs))?;
335            txn.commit()?;
336            Ok(())
337        })
338    }
339}
340
341impl extension::KeyValueStoreDelegate for IndexedDocsDatabase {
342    fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
343        IndexedDocsDatabase::insert(&self, key, docs)
344    }
345}