store.rs

  1use std::path::PathBuf;
  2use std::sync::atomic::AtomicBool;
  3use std::sync::Arc;
  4
  5use anyhow::{anyhow, Result};
  6use async_trait::async_trait;
  7use collections::HashMap;
  8use derive_more::{Deref, Display};
  9use futures::future::{self, BoxFuture, Shared};
 10use futures::FutureExt;
 11use fuzzy::StringMatchCandidate;
 12use gpui::{AppContext, BackgroundExecutor, Task};
 13use heed::types::SerdeBincode;
 14use heed::Database;
 15use parking_lot::RwLock;
 16use serde::{Deserialize, Serialize};
 17use util::ResultExt;
 18
 19use crate::IndexedDocsRegistry;
 20
 21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 22pub struct ProviderId(pub Arc<str>);
 23
 24/// The name of a package.
 25#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Deref, Display)]
 26pub struct PackageName(Arc<str>);
 27
 28impl From<&str> for PackageName {
 29    fn from(value: &str) -> Self {
 30        Self(value.into())
 31    }
 32}
 33
 34#[async_trait]
 35pub trait IndexedDocsProvider {
 36    /// Returns the ID of this provider.
 37    fn id(&self) -> ProviderId;
 38
 39    /// Returns the path to the database for this provider.
 40    fn database_path(&self) -> PathBuf;
 41
 42    /// Indexes the package with the given name.
 43    async fn index(&self, package: PackageName, database: Arc<IndexedDocsDatabase>) -> Result<()>;
 44}
 45
 46/// A store for indexed docs.
 47pub struct IndexedDocsStore {
 48    executor: BackgroundExecutor,
 49    database_future:
 50        Shared<BoxFuture<'static, Result<Arc<IndexedDocsDatabase>, Arc<anyhow::Error>>>>,
 51    provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 52    indexing_tasks_by_package:
 53        RwLock<HashMap<PackageName, Shared<Task<Result<(), Arc<anyhow::Error>>>>>>,
 54    latest_errors_by_package: RwLock<HashMap<PackageName, Arc<str>>>,
 55}
 56
 57impl IndexedDocsStore {
 58    pub fn try_global(provider: ProviderId, cx: &AppContext) -> Result<Arc<Self>> {
 59        let registry = IndexedDocsRegistry::global(cx);
 60        registry
 61            .get_provider_store(provider.clone())
 62            .ok_or_else(|| anyhow!("no indexed docs store found for {provider}"))
 63    }
 64
 65    pub fn new(
 66        provider: Box<dyn IndexedDocsProvider + Send + Sync + 'static>,
 67        executor: BackgroundExecutor,
 68    ) -> Self {
 69        let database_future = executor
 70            .spawn({
 71                let executor = executor.clone();
 72                let database_path = provider.database_path();
 73                async move { IndexedDocsDatabase::new(database_path, executor) }
 74            })
 75            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
 76            .boxed()
 77            .shared();
 78
 79        Self {
 80            executor,
 81            database_future,
 82            provider,
 83            indexing_tasks_by_package: RwLock::new(HashMap::default()),
 84            latest_errors_by_package: RwLock::new(HashMap::default()),
 85        }
 86    }
 87
 88    pub fn latest_error_for_package(&self, package: &PackageName) -> Option<Arc<str>> {
 89        self.latest_errors_by_package.read().get(package).cloned()
 90    }
 91
 92    /// Returns whether the package with the given name is currently being indexed.
 93    pub fn is_indexing(&self, package: &PackageName) -> bool {
 94        self.indexing_tasks_by_package.read().contains_key(package)
 95    }
 96
 97    pub async fn load(&self, key: String) -> Result<MarkdownDocs> {
 98        self.database_future
 99            .clone()
100            .await
101            .map_err(|err| anyhow!(err))?
102            .load(key)
103            .await
104    }
105
106    pub fn index(
107        self: Arc<Self>,
108        package: PackageName,
109    ) -> Shared<Task<Result<(), Arc<anyhow::Error>>>> {
110        if let Some(existing_task) = self.indexing_tasks_by_package.read().get(&package) {
111            return existing_task.clone();
112        }
113
114        let indexing_task = self
115            .executor
116            .spawn({
117                let this = self.clone();
118                let package = package.clone();
119                async move {
120                    let _finally = util::defer({
121                        let this = this.clone();
122                        let package = package.clone();
123                        move || {
124                            this.indexing_tasks_by_package.write().remove(&package);
125                        }
126                    });
127
128                    let index_task = {
129                        let package = package.clone();
130                        async {
131                            let database = this
132                                .database_future
133                                .clone()
134                                .await
135                                .map_err(|err| anyhow!(err))?;
136                            this.provider.index(package, database).await
137                        }
138                    };
139
140                    let result = index_task.await.map_err(Arc::new);
141                    match &result {
142                        Ok(_) => {
143                            this.latest_errors_by_package.write().remove(&package);
144                        }
145                        Err(err) => {
146                            this.latest_errors_by_package
147                                .write()
148                                .insert(package, err.to_string().into());
149                        }
150                    }
151
152                    result
153                }
154            })
155            .shared();
156
157        self.indexing_tasks_by_package
158            .write()
159            .insert(package, indexing_task.clone());
160
161        indexing_task
162    }
163
164    pub fn search(&self, query: String) -> Task<Vec<String>> {
165        let executor = self.executor.clone();
166        let database_future = self.database_future.clone();
167        self.executor.spawn(async move {
168            let Some(database) = database_future.await.map_err(|err| anyhow!(err)).log_err() else {
169                return Vec::new();
170            };
171
172            let Some(items) = database.keys().await.log_err() else {
173                return Vec::new();
174            };
175
176            let candidates = items
177                .iter()
178                .enumerate()
179                .map(|(ix, item_path)| StringMatchCandidate::new(ix, item_path.clone()))
180                .collect::<Vec<_>>();
181
182            let matches = fuzzy::match_strings(
183                &candidates,
184                &query,
185                false,
186                100,
187                &AtomicBool::default(),
188                executor,
189            )
190            .await;
191
192            matches
193                .into_iter()
194                .map(|mat| items[mat.candidate_id].clone())
195                .collect()
196        })
197    }
198}
199
200#[derive(Debug, PartialEq, Eq, Clone, Display, Serialize, Deserialize)]
201pub struct MarkdownDocs(pub String);
202
203pub struct IndexedDocsDatabase {
204    executor: BackgroundExecutor,
205    env: heed::Env,
206    entries: Database<SerdeBincode<String>, SerdeBincode<MarkdownDocs>>,
207}
208
209impl IndexedDocsDatabase {
210    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
211        std::fs::create_dir_all(&path)?;
212
213        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
214        let env = unsafe {
215            heed::EnvOpenOptions::new()
216                .map_size(ONE_GB_IN_BYTES)
217                .max_dbs(1)
218                .open(path)?
219        };
220
221        let mut txn = env.write_txn()?;
222        let entries = env.create_database(&mut txn, Some("rustdoc_entries"))?;
223        txn.commit()?;
224
225        Ok(Self {
226            executor,
227            env,
228            entries,
229        })
230    }
231
232    pub fn keys(&self) -> Task<Result<Vec<String>>> {
233        let env = self.env.clone();
234        let entries = self.entries;
235
236        self.executor.spawn(async move {
237            let txn = env.read_txn()?;
238            let mut iter = entries.iter(&txn)?;
239            let mut keys = Vec::new();
240            while let Some((key, _value)) = iter.next().transpose()? {
241                keys.push(key);
242            }
243
244            Ok(keys)
245        })
246    }
247
248    pub fn load(&self, key: String) -> Task<Result<MarkdownDocs>> {
249        let env = self.env.clone();
250        let entries = self.entries;
251
252        self.executor.spawn(async move {
253            let txn = env.read_txn()?;
254            entries
255                .get(&txn, &key)?
256                .ok_or_else(|| anyhow!("no docs found for {key}"))
257        })
258    }
259
260    pub fn insert(&self, key: String, docs: String) -> Task<Result<()>> {
261        let env = self.env.clone();
262        let entries = self.entries;
263
264        self.executor.spawn(async move {
265            let mut txn = env.write_txn()?;
266            entries.put(&mut txn, &key, &MarkdownDocs(docs))?;
267            txn.commit()?;
268            Ok(())
269        })
270    }
271}