store.rs

  1use std::path::PathBuf;
  2use std::sync::atomic::AtomicBool;
  3use std::sync::Arc;
  4
  5use anyhow::{anyhow, Result};
  6use async_trait::async_trait;
  7use collections::HashMap;
  8use derive_more::{Deref, Display};
  9use futures::future::{self, BoxFuture, Shared};
 10use futures::FutureExt;
 11use fuzzy::StringMatchCandidate;
 12use gpui::{AppContext, BackgroundExecutor, Task};
 13use heed::types::SerdeBincode;
 14use heed::Database;
 15use parking_lot::RwLock;
 16use serde::{Deserialize, Serialize};
 17use util::ResultExt;
 18
 19use crate::IndexedDocsRegistry;
 20
 21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 22pub struct ProviderId(pub Arc<str>);
 23
 24/// The name of a package.
 25#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 26pub struct PackageName(Arc<str>);
 27
 28impl From<&str> for PackageName {
 29    fn from(value: &str) -> Self {
 30        Self(value.into())
 31    }
 32}
 33
 34#[async_trait]
 35pub trait IndexedDocsProvider {
 36    /// Returns the ID of this provider.
 37    fn id(&self) -> ProviderId;
 38
 39    /// Returns the path to the database for this provider.
 40    fn database_path(&self) -> PathBuf;
 41
 42    /// Indexes the package with the given name.
 43    async fn index(&self, package: PackageName, database: Arc<IndexedDocsDatabase>) -> Result<()>;
 44}
 45
 46/// A store for indexed docs.
 47pub struct IndexedDocsStore {
 48    executor: BackgroundExecutor,
 49    database_future:
 50        Shared<BoxFuture<'static, Result<Arc<IndexedDocsDatabase>, Arc<anyhow::Error>>>>,
 51    provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 52    indexing_tasks_by_package:
 53        RwLock<HashMap<PackageName, Shared<Task<Result<(), Arc<anyhow::Error>>>>>>,
 54    latest_errors_by_package: RwLock<HashMap<PackageName, Arc<str>>>,
 55}
 56
 57impl IndexedDocsStore {
 58    pub fn try_global(provider: ProviderId, cx: &AppContext) -> Result<Arc<Self>> {
 59        let registry = IndexedDocsRegistry::global(cx);
 60        registry
 61            .get_provider_store(provider.clone())
 62            .ok_or_else(|| anyhow!("no indexed docs store found for {provider}"))
 63    }
 64
 65    pub fn new(
 66        provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 67        executor: BackgroundExecutor,
 68    ) -> Self {
 69        let database_future = executor
 70            .spawn({
 71                let executor = executor.clone();
 72                let database_path = provider.database_path();
 73                async move { IndexedDocsDatabase::new(database_path, executor) }
 74            })
 75            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
 76            .boxed()
 77            .shared();
 78
 79        Self {
 80            executor,
 81            database_future,
 82            provider,
 83            indexing_tasks_by_package: RwLock::new(HashMap::default()),
 84            latest_errors_by_package: RwLock::new(HashMap::default()),
 85        }
 86    }
 87
 88    pub fn latest_error_for_package(&self, package: &PackageName) -> Option<Arc<str>> {
 89        self.latest_errors_by_package.read().get(package).cloned()
 90    }
 91
 92    /// Returns whether the package with the given name is currently being indexed.
 93    pub fn is_indexing(&self, package: &PackageName) -> bool {
 94        self.indexing_tasks_by_package.read().contains_key(package)
 95    }
 96
 97    pub async fn load(&self, key: String) -> Result<MarkdownDocs> {
 98        self.database_future
 99            .clone()
100            .await
101            .map_err(|err| anyhow!(err))?
102            .load(key)
103            .await
104    }
105
106    pub async fn load_many_by_prefix(&self, prefix: String) -> Result<Vec<(String, MarkdownDocs)>> {
107        self.database_future
108            .clone()
109            .await
110            .map_err(|err| anyhow!(err))?
111            .load_many_by_prefix(prefix)
112            .await
113    }
114
115    pub fn index(
116        self: Arc<Self>,
117        package: PackageName,
118    ) -> Shared<Task<Result<(), Arc<anyhow::Error>>>> {
119        if let Some(existing_task) = self.indexing_tasks_by_package.read().get(&package) {
120            return existing_task.clone();
121        }
122
123        let indexing_task = self
124            .executor
125            .spawn({
126                let this = self.clone();
127                let package = package.clone();
128                async move {
129                    let _finally = util::defer({
130                        let this = this.clone();
131                        let package = package.clone();
132                        move || {
133                            this.indexing_tasks_by_package.write().remove(&package);
134                        }
135                    });
136
137                    let index_task = {
138                        let package = package.clone();
139                        async {
140                            let database = this
141                                .database_future
142                                .clone()
143                                .await
144                                .map_err(|err| anyhow!(err))?;
145                            this.provider.index(package, database).await
146                        }
147                    };
148
149                    let result = index_task.await.map_err(Arc::new);
150                    match &result {
151                        Ok(_) => {
152                            this.latest_errors_by_package.write().remove(&package);
153                        }
154                        Err(err) => {
155                            this.latest_errors_by_package
156                                .write()
157                                .insert(package, err.to_string().into());
158                        }
159                    }
160
161                    result
162                }
163            })
164            .shared();
165
166        self.indexing_tasks_by_package
167            .write()
168            .insert(package, indexing_task.clone());
169
170        indexing_task
171    }
172
173    pub fn search(&self, query: String) -> Task<Vec<String>> {
174        let executor = self.executor.clone();
175        let database_future = self.database_future.clone();
176        self.executor.spawn(async move {
177            let Some(database) = database_future.await.map_err(|err| anyhow!(err)).log_err() else {
178                return Vec::new();
179            };
180
181            let Some(items) = database.keys().await.log_err() else {
182                return Vec::new();
183            };
184
185            let candidates = items
186                .iter()
187                .enumerate()
188                .map(|(ix, item_path)| StringMatchCandidate::new(ix, item_path.clone()))
189                .collect::<Vec<_>>();
190
191            let matches = fuzzy::match_strings(
192                &candidates,
193                &query,
194                false,
195                100,
196                &AtomicBool::default(),
197                executor,
198            )
199            .await;
200
201            matches
202                .into_iter()
203                .map(|mat| items[mat.candidate_id].clone())
204                .collect()
205        })
206    }
207}
208
209#[derive(Debug, PartialEq, Eq, Clone, Display, Serialize, Deserialize)]
210pub struct MarkdownDocs(pub String);
211
212pub struct IndexedDocsDatabase {
213    executor: BackgroundExecutor,
214    env: heed::Env,
215    entries: Database<SerdeBincode<String>, SerdeBincode<MarkdownDocs>>,
216}
217
218impl IndexedDocsDatabase {
219    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
220        std::fs::create_dir_all(&path)?;
221
222        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
223        let env = unsafe {
224            heed::EnvOpenOptions::new()
225                .map_size(ONE_GB_IN_BYTES)
226                .max_dbs(1)
227                .open(path)?
228        };
229
230        let mut txn = env.write_txn()?;
231        let entries = env.create_database(&mut txn, Some("rustdoc_entries"))?;
232        txn.commit()?;
233
234        Ok(Self {
235            executor,
236            env,
237            entries,
238        })
239    }
240
241    pub fn keys(&self) -> Task<Result<Vec<String>>> {
242        let env = self.env.clone();
243        let entries = self.entries;
244
245        self.executor.spawn(async move {
246            let txn = env.read_txn()?;
247            let mut iter = entries.iter(&txn)?;
248            let mut keys = Vec::new();
249            while let Some((key, _value)) = iter.next().transpose()? {
250                keys.push(key);
251            }
252
253            Ok(keys)
254        })
255    }
256
257    pub fn load(&self, key: String) -> Task<Result<MarkdownDocs>> {
258        let env = self.env.clone();
259        let entries = self.entries;
260
261        self.executor.spawn(async move {
262            let txn = env.read_txn()?;
263            entries
264                .get(&txn, &key)?
265                .ok_or_else(|| anyhow!("no docs found for {key}"))
266        })
267    }
268
269    pub fn load_many_by_prefix(&self, prefix: String) -> Task<Result<Vec<(String, MarkdownDocs)>>> {
270        let env = self.env.clone();
271        let entries = self.entries;
272
273        self.executor.spawn(async move {
274            let txn = env.read_txn()?;
275            let results = entries
276                .iter(&txn)?
277                .filter_map(|entry| {
278                    let (key, value) = entry.ok()?;
279                    if key.starts_with(&prefix) {
280                        Some((key, value))
281                    } else {
282                        None
283                    }
284                })
285                .collect::<Vec<_>>();
286
287            Ok(results)
288        })
289    }
290
291    pub fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
292        let env = self.env.clone();
293        let entries = self.entries;
294
295        self.executor.spawn(async move {
296            let mut txn = env.write_txn()?;
297            entries.put(&mut txn, &key, &MarkdownDocs(docs))?;
298            txn.commit()?;
299            Ok(())
300        })
301    }
302}