store.rs

  1use std::path::PathBuf;
  2use std::sync::atomic::AtomicBool;
  3use std::sync::Arc;
  4
  5use anyhow::{anyhow, Result};
  6use async_trait::async_trait;
  7use collections::HashMap;
  8use derive_more::{Deref, Display};
  9use futures::future::{self, BoxFuture, Shared};
 10use futures::FutureExt;
 11use fuzzy::StringMatchCandidate;
 12use gpui::{AppContext, BackgroundExecutor, Task};
 13use heed::types::SerdeBincode;
 14use heed::Database;
 15use parking_lot::RwLock;
 16use serde::{Deserialize, Serialize};
 17use util::ResultExt;
 18
 19use crate::IndexedDocsRegistry;
 20
 21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 22pub struct ProviderId(pub Arc<str>);
 23
 24impl ProviderId {
 25    pub fn rustdoc() -> Self {
 26        Self("rustdoc".into())
 27    }
 28}
 29
 30/// The name of a package.
 31#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 32pub struct PackageName(Arc<str>);
 33
 34impl From<&str> for PackageName {
 35    fn from(value: &str) -> Self {
 36        Self(value.into())
 37    }
 38}
 39
 40#[async_trait]
 41pub trait IndexedDocsProvider {
 42    /// Returns the ID of this provider.
 43    fn id(&self) -> ProviderId;
 44
 45    /// Returns the path to the database for this provider.
 46    fn database_path(&self) -> PathBuf;
 47
 48    /// Indexes the package with the given name.
 49    async fn index(&self, package: PackageName, database: Arc<IndexedDocsDatabase>) -> Result<()>;
 50}
 51
 52/// A store for indexed docs.
 53pub struct IndexedDocsStore {
 54    executor: BackgroundExecutor,
 55    database_future:
 56        Shared<BoxFuture<'static, Result<Arc<IndexedDocsDatabase>, Arc<anyhow::Error>>>>,
 57    provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 58    indexing_tasks_by_package:
 59        RwLock<HashMap<PackageName, Shared<Task<Result<(), Arc<anyhow::Error>>>>>>,
 60    latest_errors_by_package: RwLock<HashMap<PackageName, Arc<str>>>,
 61}
 62
 63impl IndexedDocsStore {
 64    pub fn try_global(provider: ProviderId, cx: &AppContext) -> Result<Arc<Self>> {
 65        let registry = IndexedDocsRegistry::global(cx);
 66        registry
 67            .get_provider_store(provider.clone())
 68            .ok_or_else(|| anyhow!("no indexed docs store found for {provider}"))
 69    }
 70
 71    pub fn new(
 72        provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 73        executor: BackgroundExecutor,
 74    ) -> Self {
 75        let database_future = executor
 76            .spawn({
 77                let executor = executor.clone();
 78                let database_path = provider.database_path();
 79                async move { IndexedDocsDatabase::new(database_path, executor) }
 80            })
 81            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
 82            .boxed()
 83            .shared();
 84
 85        Self {
 86            executor,
 87            database_future,
 88            provider,
 89            indexing_tasks_by_package: RwLock::new(HashMap::default()),
 90            latest_errors_by_package: RwLock::new(HashMap::default()),
 91        }
 92    }
 93
 94    pub fn latest_error_for_package(&self, package: &PackageName) -> Option<Arc<str>> {
 95        self.latest_errors_by_package.read().get(package).cloned()
 96    }
 97
 98    /// Returns whether the package with the given name is currently being indexed.
 99    pub fn is_indexing(&self, package: &PackageName) -> bool {
100        self.indexing_tasks_by_package.read().contains_key(package)
101    }
102
103    pub async fn load(&self, key: String) -> Result<MarkdownDocs> {
104        self.database_future
105            .clone()
106            .await
107            .map_err(|err| anyhow!(err))?
108            .load(key)
109            .await
110    }
111
112    pub fn index(
113        self: Arc<Self>,
114        package: PackageName,
115    ) -> Shared<Task<Result<(), Arc<anyhow::Error>>>> {
116        if let Some(existing_task) = self.indexing_tasks_by_package.read().get(&package) {
117            return existing_task.clone();
118        }
119
120        let indexing_task = self
121            .executor
122            .spawn({
123                let this = self.clone();
124                let package = package.clone();
125                async move {
126                    let _finally = util::defer({
127                        let this = this.clone();
128                        let package = package.clone();
129                        move || {
130                            this.indexing_tasks_by_package.write().remove(&package);
131                        }
132                    });
133
134                    let index_task = {
135                        let package = package.clone();
136                        async {
137                            let database = this
138                                .database_future
139                                .clone()
140                                .await
141                                .map_err(|err| anyhow!(err))?;
142                            this.provider.index(package, database).await
143                        }
144                    };
145
146                    let result = index_task.await.map_err(Arc::new);
147                    match &result {
148                        Ok(_) => {
149                            this.latest_errors_by_package.write().remove(&package);
150                        }
151                        Err(err) => {
152                            this.latest_errors_by_package
153                                .write()
154                                .insert(package, err.to_string().into());
155                        }
156                    }
157
158                    result
159                }
160            })
161            .shared();
162
163        self.indexing_tasks_by_package
164            .write()
165            .insert(package, indexing_task.clone());
166
167        indexing_task
168    }
169
170    pub fn search(&self, query: String) -> Task<Vec<String>> {
171        let executor = self.executor.clone();
172        let database_future = self.database_future.clone();
173        self.executor.spawn(async move {
174            let Some(database) = database_future.await.map_err(|err| anyhow!(err)).log_err() else {
175                return Vec::new();
176            };
177
178            let Some(items) = database.keys().await.log_err() else {
179                return Vec::new();
180            };
181
182            let candidates = items
183                .iter()
184                .enumerate()
185                .map(|(ix, item_path)| StringMatchCandidate::new(ix, item_path.clone()))
186                .collect::<Vec<_>>();
187
188            let matches = fuzzy::match_strings(
189                &candidates,
190                &query,
191                false,
192                100,
193                &AtomicBool::default(),
194                executor,
195            )
196            .await;
197
198            matches
199                .into_iter()
200                .map(|mat| items[mat.candidate_id].clone())
201                .collect()
202        })
203    }
204}
205
206#[derive(Debug, PartialEq, Eq, Clone, Display, Serialize, Deserialize)]
207pub struct MarkdownDocs(pub String);
208
209pub struct IndexedDocsDatabase {
210    executor: BackgroundExecutor,
211    env: heed::Env,
212    entries: Database<SerdeBincode<String>, SerdeBincode<MarkdownDocs>>,
213}
214
215impl IndexedDocsDatabase {
216    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
217        std::fs::create_dir_all(&path)?;
218
219        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
220        let env = unsafe {
221            heed::EnvOpenOptions::new()
222                .map_size(ONE_GB_IN_BYTES)
223                .max_dbs(1)
224                .open(path)?
225        };
226
227        let mut txn = env.write_txn()?;
228        let entries = env.create_database(&mut txn, Some("rustdoc_entries"))?;
229        txn.commit()?;
230
231        Ok(Self {
232            executor,
233            env,
234            entries,
235        })
236    }
237
238    pub fn keys(&self) -> Task<Result<Vec<String>>> {
239        let env = self.env.clone();
240        let entries = self.entries;
241
242        self.executor.spawn(async move {
243            let txn = env.read_txn()?;
244            let mut iter = entries.iter(&txn)?;
245            let mut keys = Vec::new();
246            while let Some((key, _value)) = iter.next().transpose()? {
247                keys.push(key);
248            }
249
250            Ok(keys)
251        })
252    }
253
254    pub fn load(&self, key: String) -> Task<Result<MarkdownDocs>> {
255        let env = self.env.clone();
256        let entries = self.entries;
257
258        self.executor.spawn(async move {
259            let txn = env.read_txn()?;
260            entries
261                .get(&txn, &key)?
262                .ok_or_else(|| anyhow!("no docs found for {key}"))
263        })
264    }
265
266    pub fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
267        let env = self.env.clone();
268        let entries = self.entries;
269
270        self.executor.spawn(async move {
271            let mut txn = env.write_txn()?;
272            entries.put(&mut txn, &key, &MarkdownDocs(docs))?;
273            txn.commit()?;
274            Ok(())
275        })
276    }
277}