store.rs

  1use std::path::PathBuf;
  2use std::sync::atomic::AtomicBool;
  3use std::sync::Arc;
  4
  5use anyhow::{anyhow, Result};
  6use async_trait::async_trait;
  7use collections::HashMap;
  8use derive_more::{Deref, Display};
  9use futures::future::{self, BoxFuture, Shared};
 10use futures::FutureExt;
 11use fuzzy::StringMatchCandidate;
 12use gpui::{AppContext, BackgroundExecutor, Task};
 13use heed::types::SerdeBincode;
 14use heed::Database;
 15use parking_lot::RwLock;
 16use serde::{Deserialize, Serialize};
 17use util::ResultExt;
 18
 19use crate::IndexedDocsRegistry;
 20
 21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 22pub struct ProviderId(pub Arc<str>);
 23
 24/// The name of a package.
 25#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 26pub struct PackageName(Arc<str>);
 27
 28impl From<&str> for PackageName {
 29    fn from(value: &str) -> Self {
 30        Self(value.into())
 31    }
 32}
 33
 34#[async_trait]
 35pub trait IndexedDocsProvider {
 36    /// Returns the ID of this provider.
 37    fn id(&self) -> ProviderId;
 38
 39    /// Returns the path to the database for this provider.
 40    fn database_path(&self) -> PathBuf;
 41
 42    /// Indexes the package with the given name.
 43    async fn index(&self, package: PackageName, database: Arc<IndexedDocsDatabase>) -> Result<()>;
 44}
 45
 46/// A store for indexed docs.
 47pub struct IndexedDocsStore {
 48    executor: BackgroundExecutor,
 49    database_future:
 50        Shared<BoxFuture<'static, Result<Arc<IndexedDocsDatabase>, Arc<anyhow::Error>>>>,
 51    provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 52    indexing_tasks_by_package:
 53        RwLock<HashMap<PackageName, Shared<Task<Result<(), Arc<anyhow::Error>>>>>>,
 54    latest_errors_by_package: RwLock<HashMap<PackageName, Arc<str>>>,
 55}
 56
 57impl IndexedDocsStore {
 58    pub fn try_global(provider: ProviderId, cx: &AppContext) -> Result<Arc<Self>> {
 59        let registry = IndexedDocsRegistry::global(cx);
 60        registry
 61            .get_provider_store(provider.clone())
 62            .ok_or_else(|| anyhow!("no indexed docs store found for {provider}"))
 63    }
 64
 65    pub fn new(
 66        provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 67        executor: BackgroundExecutor,
 68    ) -> Self {
 69        let database_future = executor
 70            .spawn({
 71                let executor = executor.clone();
 72                let database_path = provider.database_path();
 73                async move { IndexedDocsDatabase::new(database_path, executor) }
 74            })
 75            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
 76            .boxed()
 77            .shared();
 78
 79        Self {
 80            executor,
 81            database_future,
 82            provider,
 83            indexing_tasks_by_package: RwLock::new(HashMap::default()),
 84            latest_errors_by_package: RwLock::new(HashMap::default()),
 85        }
 86    }
 87
 88    pub fn latest_error_for_package(&self, package: &PackageName) -> Option<Arc<str>> {
 89        self.latest_errors_by_package.read().get(package).cloned()
 90    }
 91
 92    /// Returns whether the package with the given name is currently being indexed.
 93    pub fn is_indexing(&self, package: &PackageName) -> bool {
 94        self.indexing_tasks_by_package.read().contains_key(package)
 95    }
 96
 97    pub async fn load(&self, key: String) -> Result<MarkdownDocs> {
 98        self.database_future
 99            .clone()
100            .await
101            .map_err(|err| anyhow!(err))?
102            .load(key)
103            .await
104    }
105
106    pub async fn load_many_by_prefix(&self, prefix: String) -> Result<Vec<(String, MarkdownDocs)>> {
107        self.database_future
108            .clone()
109            .await
110            .map_err(|err| anyhow!(err))?
111            .load_many_by_prefix(prefix)
112            .await
113    }
114
115    /// Returns whether any entries exist with the given prefix.
116    pub async fn any_with_prefix(&self, prefix: String) -> Result<bool> {
117        self.database_future
118            .clone()
119            .await
120            .map_err(|err| anyhow!(err))?
121            .any_with_prefix(prefix)
122            .await
123    }
124
125    pub fn index(
126        self: Arc<Self>,
127        package: PackageName,
128    ) -> Shared<Task<Result<(), Arc<anyhow::Error>>>> {
129        if let Some(existing_task) = self.indexing_tasks_by_package.read().get(&package) {
130            return existing_task.clone();
131        }
132
133        let indexing_task = self
134            .executor
135            .spawn({
136                let this = self.clone();
137                let package = package.clone();
138                async move {
139                    let _finally = util::defer({
140                        let this = this.clone();
141                        let package = package.clone();
142                        move || {
143                            this.indexing_tasks_by_package.write().remove(&package);
144                        }
145                    });
146
147                    let index_task = {
148                        let package = package.clone();
149                        async {
150                            let database = this
151                                .database_future
152                                .clone()
153                                .await
154                                .map_err(|err| anyhow!(err))?;
155                            this.provider.index(package, database).await
156                        }
157                    };
158
159                    let result = index_task.await.map_err(Arc::new);
160                    match &result {
161                        Ok(_) => {
162                            this.latest_errors_by_package.write().remove(&package);
163                        }
164                        Err(err) => {
165                            this.latest_errors_by_package
166                                .write()
167                                .insert(package, err.to_string().into());
168                        }
169                    }
170
171                    result
172                }
173            })
174            .shared();
175
176        self.indexing_tasks_by_package
177            .write()
178            .insert(package, indexing_task.clone());
179
180        indexing_task
181    }
182
183    pub fn search(&self, query: String) -> Task<Vec<String>> {
184        let executor = self.executor.clone();
185        let database_future = self.database_future.clone();
186        self.executor.spawn(async move {
187            let Some(database) = database_future.await.map_err(|err| anyhow!(err)).log_err() else {
188                return Vec::new();
189            };
190
191            let Some(items) = database.keys().await.log_err() else {
192                return Vec::new();
193            };
194
195            let candidates = items
196                .iter()
197                .enumerate()
198                .map(|(ix, item_path)| StringMatchCandidate::new(ix, item_path.clone()))
199                .collect::<Vec<_>>();
200
201            let matches = fuzzy::match_strings(
202                &candidates,
203                &query,
204                false,
205                100,
206                &AtomicBool::default(),
207                executor,
208            )
209            .await;
210
211            matches
212                .into_iter()
213                .map(|mat| items[mat.candidate_id].clone())
214                .collect()
215        })
216    }
217}
218
219#[derive(Debug, PartialEq, Eq, Clone, Display, Serialize, Deserialize)]
220pub struct MarkdownDocs(pub String);
221
222pub struct IndexedDocsDatabase {
223    executor: BackgroundExecutor,
224    env: heed::Env,
225    entries: Database<SerdeBincode<String>, SerdeBincode<MarkdownDocs>>,
226}
227
228impl IndexedDocsDatabase {
229    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
230        std::fs::create_dir_all(&path)?;
231
232        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
233        let env = unsafe {
234            heed::EnvOpenOptions::new()
235                .map_size(ONE_GB_IN_BYTES)
236                .max_dbs(1)
237                .open(path)?
238        };
239
240        let mut txn = env.write_txn()?;
241        let entries = env.create_database(&mut txn, Some("rustdoc_entries"))?;
242        txn.commit()?;
243
244        Ok(Self {
245            executor,
246            env,
247            entries,
248        })
249    }
250
251    pub fn keys(&self) -> Task<Result<Vec<String>>> {
252        let env = self.env.clone();
253        let entries = self.entries;
254
255        self.executor.spawn(async move {
256            let txn = env.read_txn()?;
257            let mut iter = entries.iter(&txn)?;
258            let mut keys = Vec::new();
259            while let Some((key, _value)) = iter.next().transpose()? {
260                keys.push(key);
261            }
262
263            Ok(keys)
264        })
265    }
266
267    pub fn load(&self, key: String) -> Task<Result<MarkdownDocs>> {
268        let env = self.env.clone();
269        let entries = self.entries;
270
271        self.executor.spawn(async move {
272            let txn = env.read_txn()?;
273            entries
274                .get(&txn, &key)?
275                .ok_or_else(|| anyhow!("no docs found for {key}"))
276        })
277    }
278
279    pub fn load_many_by_prefix(&self, prefix: String) -> Task<Result<Vec<(String, MarkdownDocs)>>> {
280        let env = self.env.clone();
281        let entries = self.entries;
282
283        self.executor.spawn(async move {
284            let txn = env.read_txn()?;
285            let results = entries
286                .iter(&txn)?
287                .filter_map(|entry| {
288                    let (key, value) = entry.ok()?;
289                    if key.starts_with(&prefix) {
290                        Some((key, value))
291                    } else {
292                        None
293                    }
294                })
295                .collect::<Vec<_>>();
296
297            Ok(results)
298        })
299    }
300
301    /// Returns whether any entries exist with the given prefix.
302    pub fn any_with_prefix(&self, prefix: String) -> Task<Result<bool>> {
303        let env = self.env.clone();
304        let entries = self.entries;
305
306        self.executor.spawn(async move {
307            let txn = env.read_txn()?;
308            let any = entries
309                .iter(&txn)?
310                .any(|entry| entry.map_or(false, |(key, _value)| key.starts_with(&prefix)));
311            Ok(any)
312        })
313    }
314
315    pub fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
316        let env = self.env.clone();
317        let entries = self.entries;
318
319        self.executor.spawn(async move {
320            let mut txn = env.write_txn()?;
321            entries.put(&mut txn, &key, &MarkdownDocs(docs))?;
322            txn.commit()?;
323            Ok(())
324        })
325    }
326}