thread_store.rs

  1use std::borrow::Cow;
  2use std::cell::{Ref, RefCell};
  3use std::path::{Path, PathBuf};
  4use std::rc::Rc;
  5use std::sync::Arc;
  6
  7use anyhow::{Context as _, Result, anyhow};
  8use assistant_settings::{AgentProfile, AgentProfileId, AssistantSettings};
  9use assistant_tool::{ToolId, ToolSource, ToolWorkingSet};
 10use chrono::{DateTime, Utc};
 11use collections::HashMap;
 12use context_server::manager::ContextServerManager;
 13use context_server::{ContextServerFactoryRegistry, ContextServerTool};
 14use fs::Fs;
 15use futures::FutureExt as _;
 16use futures::future::{self, BoxFuture, Shared};
 17use gpui::{
 18    App, BackgroundExecutor, Context, Entity, EventEmitter, Global, ReadGlobal, SharedString,
 19    Subscription, Task, prelude::*,
 20};
 21use heed::Database;
 22use heed::types::SerdeBincode;
 23use language_model::{LanguageModelToolUseId, Role, TokenUsage};
 24use project::{Project, Worktree};
 25use prompt_store::{ProjectContext, PromptBuilder, RulesFileContext, WorktreeContext};
 26use serde::{Deserialize, Serialize};
 27use settings::{Settings as _, SettingsStore};
 28use util::ResultExt as _;
 29
 30use crate::thread::{
 31    DetailedSummaryState, ExceededWindowError, MessageId, ProjectSnapshot, Thread, ThreadId,
 32};
 33
 34const RULES_FILE_NAMES: [&'static str; 6] = [
 35    ".rules",
 36    ".cursorrules",
 37    ".windsurfrules",
 38    ".clinerules",
 39    ".github/copilot-instructions.md",
 40    "CLAUDE.md",
 41];
 42
 43pub fn init(cx: &mut App) {
 44    ThreadsDatabase::init(cx);
 45}
 46
 47/// A system prompt shared by all threads created by this ThreadStore
 48#[derive(Clone, Default)]
 49pub struct SharedProjectContext(Rc<RefCell<Option<ProjectContext>>>);
 50
 51impl SharedProjectContext {
 52    pub fn borrow(&self) -> Ref<Option<ProjectContext>> {
 53        self.0.borrow()
 54    }
 55}
 56
 57pub struct ThreadStore {
 58    project: Entity<Project>,
 59    tools: Entity<ToolWorkingSet>,
 60    prompt_builder: Arc<PromptBuilder>,
 61    context_server_manager: Entity<ContextServerManager>,
 62    context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
 63    threads: Vec<SerializedThreadMetadata>,
 64    project_context: SharedProjectContext,
 65    _subscriptions: Vec<Subscription>,
 66}
 67
 68pub struct RulesLoadingError {
 69    pub message: SharedString,
 70}
 71
 72impl EventEmitter<RulesLoadingError> for ThreadStore {}
 73
 74impl ThreadStore {
 75    pub fn load(
 76        project: Entity<Project>,
 77        tools: Entity<ToolWorkingSet>,
 78        prompt_builder: Arc<PromptBuilder>,
 79        cx: &mut App,
 80    ) -> Task<Entity<Self>> {
 81        let thread_store = cx.new(|cx| Self::new(project, tools, prompt_builder, cx));
 82        let reload = thread_store.update(cx, |store, cx| store.reload_system_prompt(cx));
 83        cx.foreground_executor().spawn(async move {
 84            reload.await;
 85            thread_store
 86        })
 87    }
 88
 89    fn new(
 90        project: Entity<Project>,
 91        tools: Entity<ToolWorkingSet>,
 92        prompt_builder: Arc<PromptBuilder>,
 93        cx: &mut Context<Self>,
 94    ) -> Self {
 95        let context_server_factory_registry = ContextServerFactoryRegistry::default_global(cx);
 96        let context_server_manager = cx.new(|cx| {
 97            ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
 98        });
 99        let settings_subscription =
100            cx.observe_global::<SettingsStore>(move |this: &mut Self, cx| {
101                this.load_default_profile(cx);
102            });
103        let project_subscription = cx.subscribe(&project, Self::handle_project_event);
104
105        let this = Self {
106            project,
107            tools,
108            prompt_builder,
109            context_server_manager,
110            context_server_tool_ids: HashMap::default(),
111            threads: Vec::new(),
112            project_context: SharedProjectContext::default(),
113            _subscriptions: vec![settings_subscription, project_subscription],
114        };
115        this.load_default_profile(cx);
116        this.register_context_server_handlers(cx);
117        this.reload(cx).detach_and_log_err(cx);
118        this
119    }
120
121    fn handle_project_event(
122        &mut self,
123        _project: Entity<Project>,
124        event: &project::Event,
125        cx: &mut Context<Self>,
126    ) {
127        match event {
128            project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
129                self.reload_system_prompt(cx).detach();
130            }
131            project::Event::WorktreeUpdatedEntries(_, items) => {
132                if items.iter().any(|(path, _, _)| {
133                    RULES_FILE_NAMES
134                        .iter()
135                        .any(|name| path.as_ref() == Path::new(name))
136                }) {
137                    self.reload_system_prompt(cx).detach();
138                }
139            }
140            _ => {}
141        }
142    }
143
144    pub fn reload_system_prompt(&self, cx: &mut Context<Self>) -> Task<()> {
145        let project = self.project.read(cx);
146        let tasks = project
147            .visible_worktrees(cx)
148            .map(|worktree| {
149                Self::load_worktree_info_for_system_prompt(
150                    project.fs().clone(),
151                    worktree.read(cx),
152                    cx,
153                )
154            })
155            .collect::<Vec<_>>();
156
157        cx.spawn(async move |this, cx| {
158            let results = futures::future::join_all(tasks).await;
159            let worktrees = results
160                .into_iter()
161                .map(|(worktree, rules_error)| {
162                    if let Some(rules_error) = rules_error {
163                        this.update(cx, |_, cx| cx.emit(rules_error)).ok();
164                    }
165                    worktree
166                })
167                .collect::<Vec<_>>();
168            this.update(cx, |this, _cx| {
169                *this.project_context.0.borrow_mut() = Some(ProjectContext::new(worktrees));
170            })
171            .ok();
172        })
173    }
174
175    fn load_worktree_info_for_system_prompt(
176        fs: Arc<dyn Fs>,
177        worktree: &Worktree,
178        cx: &App,
179    ) -> Task<(WorktreeContext, Option<RulesLoadingError>)> {
180        let root_name = worktree.root_name().into();
181        let abs_path = worktree.abs_path();
182
183        let rules_task = Self::load_worktree_rules_file(fs, worktree, cx);
184        let Some(rules_task) = rules_task else {
185            return Task::ready((
186                WorktreeContext {
187                    root_name,
188                    abs_path,
189                    rules_file: None,
190                },
191                None,
192            ));
193        };
194
195        cx.spawn(async move |_| {
196            let (rules_file, rules_file_error) = match rules_task.await {
197                Ok(rules_file) => (Some(rules_file), None),
198                Err(err) => (
199                    None,
200                    Some(RulesLoadingError {
201                        message: format!("{err}").into(),
202                    }),
203                ),
204            };
205            let worktree_info = WorktreeContext {
206                root_name,
207                abs_path,
208                rules_file,
209            };
210            (worktree_info, rules_file_error)
211        })
212    }
213
214    fn load_worktree_rules_file(
215        fs: Arc<dyn Fs>,
216        worktree: &Worktree,
217        cx: &App,
218    ) -> Option<Task<Result<RulesFileContext>>> {
219        let selected_rules_file = RULES_FILE_NAMES
220            .into_iter()
221            .filter_map(|name| {
222                worktree
223                    .entry_for_path(name)
224                    .filter(|entry| entry.is_file())
225                    .map(|entry| (entry.path.clone(), worktree.absolutize(&entry.path)))
226            })
227            .next();
228
229        // Note that Cline supports `.clinerules` being a directory, but that is not currently
230        // supported. This doesn't seem to occur often in GitHub repositories.
231        selected_rules_file.map(|(path_in_worktree, abs_path)| {
232            let fs = fs.clone();
233            cx.background_spawn(async move {
234                let abs_path = abs_path?;
235                let text = fs.load(&abs_path).await.with_context(|| {
236                    format!("Failed to load assistant rules file {:?}", abs_path)
237                })?;
238                anyhow::Ok(RulesFileContext {
239                    path_in_worktree,
240                    abs_path: abs_path.into(),
241                    text: text.trim().to_string(),
242                })
243            })
244        })
245    }
246
247    pub fn context_server_manager(&self) -> Entity<ContextServerManager> {
248        self.context_server_manager.clone()
249    }
250
251    pub fn tools(&self) -> Entity<ToolWorkingSet> {
252        self.tools.clone()
253    }
254
255    /// Returns the number of threads.
256    pub fn thread_count(&self) -> usize {
257        self.threads.len()
258    }
259
260    pub fn threads(&self) -> Vec<SerializedThreadMetadata> {
261        let mut threads = self.threads.iter().cloned().collect::<Vec<_>>();
262        threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.updated_at));
263        threads
264    }
265
266    pub fn recent_threads(&self, limit: usize) -> Vec<SerializedThreadMetadata> {
267        self.threads().into_iter().take(limit).collect()
268    }
269
270    pub fn create_thread(&mut self, cx: &mut Context<Self>) -> Entity<Thread> {
271        cx.new(|cx| {
272            Thread::new(
273                self.project.clone(),
274                self.tools.clone(),
275                self.prompt_builder.clone(),
276                self.project_context.clone(),
277                cx,
278            )
279        })
280    }
281
282    pub fn open_thread(
283        &self,
284        id: &ThreadId,
285        cx: &mut Context<Self>,
286    ) -> Task<Result<Entity<Thread>>> {
287        let id = id.clone();
288        let database_future = ThreadsDatabase::global_future(cx);
289        cx.spawn(async move |this, cx| {
290            let database = database_future.await.map_err(|err| anyhow!(err))?;
291            let thread = database
292                .try_find_thread(id.clone())
293                .await?
294                .ok_or_else(|| anyhow!("no thread found with ID: {id:?}"))?;
295
296            let thread = this.update(cx, |this, cx| {
297                cx.new(|cx| {
298                    Thread::deserialize(
299                        id.clone(),
300                        thread,
301                        this.project.clone(),
302                        this.tools.clone(),
303                        this.prompt_builder.clone(),
304                        this.project_context.clone(),
305                        cx,
306                    )
307                })
308            })?;
309
310            Ok(thread)
311        })
312    }
313
314    pub fn save_thread(&self, thread: &Entity<Thread>, cx: &mut Context<Self>) -> Task<Result<()>> {
315        let (metadata, serialized_thread) =
316            thread.update(cx, |thread, cx| (thread.id().clone(), thread.serialize(cx)));
317
318        let database_future = ThreadsDatabase::global_future(cx);
319        cx.spawn(async move |this, cx| {
320            let serialized_thread = serialized_thread.await?;
321            let database = database_future.await.map_err(|err| anyhow!(err))?;
322            database.save_thread(metadata, serialized_thread).await?;
323
324            this.update(cx, |this, cx| this.reload(cx))?.await
325        })
326    }
327
328    pub fn delete_thread(&mut self, id: &ThreadId, cx: &mut Context<Self>) -> Task<Result<()>> {
329        let id = id.clone();
330        let database_future = ThreadsDatabase::global_future(cx);
331        cx.spawn(async move |this, cx| {
332            let database = database_future.await.map_err(|err| anyhow!(err))?;
333            database.delete_thread(id.clone()).await?;
334
335            this.update(cx, |this, cx| {
336                this.threads.retain(|thread| thread.id != id);
337                cx.notify();
338            })
339        })
340    }
341
342    pub fn reload(&self, cx: &mut Context<Self>) -> Task<Result<()>> {
343        let database_future = ThreadsDatabase::global_future(cx);
344        cx.spawn(async move |this, cx| {
345            let threads = database_future
346                .await
347                .map_err(|err| anyhow!(err))?
348                .list_threads()
349                .await?;
350
351            this.update(cx, |this, cx| {
352                this.threads = threads;
353                cx.notify();
354            })
355        })
356    }
357
358    fn load_default_profile(&self, cx: &mut Context<Self>) {
359        let assistant_settings = AssistantSettings::get_global(cx);
360
361        self.load_profile_by_id(assistant_settings.default_profile.clone(), cx);
362    }
363
364    pub fn load_profile_by_id(&self, profile_id: AgentProfileId, cx: &mut Context<Self>) {
365        let assistant_settings = AssistantSettings::get_global(cx);
366
367        if let Some(profile) = assistant_settings.profiles.get(&profile_id) {
368            self.load_profile(profile.clone(), cx);
369        }
370    }
371
372    pub fn load_profile(&self, profile: AgentProfile, cx: &mut Context<Self>) {
373        self.tools.update(cx, |tools, cx| {
374            tools.disable_all_tools(cx);
375            tools.enable(
376                ToolSource::Native,
377                &profile
378                    .tools
379                    .iter()
380                    .filter_map(|(tool, enabled)| enabled.then(|| tool.clone()))
381                    .collect::<Vec<_>>(),
382                cx,
383            );
384        });
385
386        if profile.enable_all_context_servers {
387            for context_server in self.context_server_manager.read(cx).all_servers() {
388                self.tools.update(cx, |tools, cx| {
389                    tools.enable_source(
390                        ToolSource::ContextServer {
391                            id: context_server.id().into(),
392                        },
393                        cx,
394                    );
395                });
396            }
397        } else {
398            for (context_server_id, preset) in &profile.context_servers {
399                self.tools.update(cx, |tools, cx| {
400                    tools.enable(
401                        ToolSource::ContextServer {
402                            id: context_server_id.clone().into(),
403                        },
404                        &preset
405                            .tools
406                            .iter()
407                            .filter_map(|(tool, enabled)| enabled.then(|| tool.clone()))
408                            .collect::<Vec<_>>(),
409                        cx,
410                    )
411                })
412            }
413        }
414    }
415
416    fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
417        cx.subscribe(
418            &self.context_server_manager.clone(),
419            Self::handle_context_server_event,
420        )
421        .detach();
422    }
423
424    fn handle_context_server_event(
425        &mut self,
426        context_server_manager: Entity<ContextServerManager>,
427        event: &context_server::manager::Event,
428        cx: &mut Context<Self>,
429    ) {
430        let tool_working_set = self.tools.clone();
431        match event {
432            context_server::manager::Event::ServerStarted { server_id } => {
433                if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
434                    let context_server_manager = context_server_manager.clone();
435                    cx.spawn({
436                        let server = server.clone();
437                        let server_id = server_id.clone();
438                        async move |this, cx| {
439                            let Some(protocol) = server.client() else {
440                                return;
441                            };
442
443                            if protocol.capable(context_server::protocol::ServerCapability::Tools) {
444                                if let Some(tools) = protocol.list_tools().await.log_err() {
445                                    let tool_ids = tool_working_set
446                                        .update(cx, |tool_working_set, _| {
447                                            tools
448                                                .tools
449                                                .into_iter()
450                                                .map(|tool| {
451                                                    log::info!(
452                                                        "registering context server tool: {:?}",
453                                                        tool.name
454                                                    );
455                                                    tool_working_set.insert(Arc::new(
456                                                        ContextServerTool::new(
457                                                            context_server_manager.clone(),
458                                                            server.id(),
459                                                            tool,
460                                                        ),
461                                                    ))
462                                                })
463                                                .collect::<Vec<_>>()
464                                        })
465                                        .log_err();
466
467                                    if let Some(tool_ids) = tool_ids {
468                                        this.update(cx, |this, cx| {
469                                            this.context_server_tool_ids
470                                                .insert(server_id, tool_ids);
471                                            this.load_default_profile(cx);
472                                        })
473                                        .log_err();
474                                    }
475                                }
476                            }
477                        }
478                    })
479                    .detach();
480                }
481            }
482            context_server::manager::Event::ServerStopped { server_id } => {
483                if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
484                    tool_working_set.update(cx, |tool_working_set, _| {
485                        tool_working_set.remove(&tool_ids);
486                    });
487                    self.load_default_profile(cx);
488                }
489            }
490        }
491    }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct SerializedThreadMetadata {
496    pub id: ThreadId,
497    pub summary: SharedString,
498    pub updated_at: DateTime<Utc>,
499}
500
501#[derive(Serialize, Deserialize, Debug)]
502pub struct SerializedThread {
503    pub version: String,
504    pub summary: SharedString,
505    pub updated_at: DateTime<Utc>,
506    pub messages: Vec<SerializedMessage>,
507    #[serde(default)]
508    pub initial_project_snapshot: Option<Arc<ProjectSnapshot>>,
509    #[serde(default)]
510    pub cumulative_token_usage: TokenUsage,
511    #[serde(default)]
512    pub request_token_usage: Vec<TokenUsage>,
513    #[serde(default)]
514    pub detailed_summary_state: DetailedSummaryState,
515    #[serde(default)]
516    pub exceeded_window_error: Option<ExceededWindowError>,
517}
518
519impl SerializedThread {
520    pub const VERSION: &'static str = "0.1.0";
521
522    pub fn from_json(json: &[u8]) -> Result<Self> {
523        let saved_thread_json = serde_json::from_slice::<serde_json::Value>(json)?;
524        match saved_thread_json.get("version") {
525            Some(serde_json::Value::String(version)) => match version.as_str() {
526                SerializedThread::VERSION => Ok(serde_json::from_value::<SerializedThread>(
527                    saved_thread_json,
528                )?),
529                _ => Err(anyhow!(
530                    "unrecognized serialized thread version: {}",
531                    version
532                )),
533            },
534            None => {
535                let saved_thread =
536                    serde_json::from_value::<LegacySerializedThread>(saved_thread_json)?;
537                Ok(saved_thread.upgrade())
538            }
539            version => Err(anyhow!(
540                "unrecognized serialized thread version: {:?}",
541                version
542            )),
543        }
544    }
545}
546
547#[derive(Debug, Serialize, Deserialize)]
548pub struct SerializedMessage {
549    pub id: MessageId,
550    pub role: Role,
551    #[serde(default)]
552    pub segments: Vec<SerializedMessageSegment>,
553    #[serde(default)]
554    pub tool_uses: Vec<SerializedToolUse>,
555    #[serde(default)]
556    pub tool_results: Vec<SerializedToolResult>,
557    #[serde(default)]
558    pub context: String,
559}
560
561#[derive(Debug, Serialize, Deserialize)]
562#[serde(tag = "type")]
563pub enum SerializedMessageSegment {
564    #[serde(rename = "text")]
565    Text { text: String },
566    #[serde(rename = "thinking")]
567    Thinking { text: String },
568}
569
570#[derive(Debug, Serialize, Deserialize)]
571pub struct SerializedToolUse {
572    pub id: LanguageModelToolUseId,
573    pub name: SharedString,
574    pub input: serde_json::Value,
575}
576
577#[derive(Debug, Serialize, Deserialize)]
578pub struct SerializedToolResult {
579    pub tool_use_id: LanguageModelToolUseId,
580    pub is_error: bool,
581    pub content: Arc<str>,
582}
583
584#[derive(Serialize, Deserialize)]
585struct LegacySerializedThread {
586    pub summary: SharedString,
587    pub updated_at: DateTime<Utc>,
588    pub messages: Vec<LegacySerializedMessage>,
589    #[serde(default)]
590    pub initial_project_snapshot: Option<Arc<ProjectSnapshot>>,
591}
592
593impl LegacySerializedThread {
594    pub fn upgrade(self) -> SerializedThread {
595        SerializedThread {
596            version: SerializedThread::VERSION.to_string(),
597            summary: self.summary,
598            updated_at: self.updated_at,
599            messages: self.messages.into_iter().map(|msg| msg.upgrade()).collect(),
600            initial_project_snapshot: self.initial_project_snapshot,
601            cumulative_token_usage: TokenUsage::default(),
602            request_token_usage: Vec::new(),
603            detailed_summary_state: DetailedSummaryState::default(),
604            exceeded_window_error: None,
605        }
606    }
607}
608
609#[derive(Debug, Serialize, Deserialize)]
610struct LegacySerializedMessage {
611    pub id: MessageId,
612    pub role: Role,
613    pub text: String,
614    #[serde(default)]
615    pub tool_uses: Vec<SerializedToolUse>,
616    #[serde(default)]
617    pub tool_results: Vec<SerializedToolResult>,
618}
619
620impl LegacySerializedMessage {
621    fn upgrade(self) -> SerializedMessage {
622        SerializedMessage {
623            id: self.id,
624            role: self.role,
625            segments: vec![SerializedMessageSegment::Text { text: self.text }],
626            tool_uses: self.tool_uses,
627            tool_results: self.tool_results,
628            context: String::new(),
629        }
630    }
631}
632
633struct GlobalThreadsDatabase(
634    Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>>,
635);
636
637impl Global for GlobalThreadsDatabase {}
638
639pub(crate) struct ThreadsDatabase {
640    executor: BackgroundExecutor,
641    env: heed::Env,
642    threads: Database<SerdeBincode<ThreadId>, SerializedThread>,
643}
644
645impl heed::BytesEncode<'_> for SerializedThread {
646    type EItem = SerializedThread;
647
648    fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, heed::BoxedError> {
649        serde_json::to_vec(item).map(Cow::Owned).map_err(Into::into)
650    }
651}
652
653impl<'a> heed::BytesDecode<'a> for SerializedThread {
654    type DItem = SerializedThread;
655
656    fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, heed::BoxedError> {
657        // We implement this type manually because we want to call `SerializedThread::from_json`,
658        // instead of the Deserialize trait implementation for `SerializedThread`.
659        SerializedThread::from_json(bytes).map_err(Into::into)
660    }
661}
662
663impl ThreadsDatabase {
664    fn global_future(
665        cx: &mut App,
666    ) -> Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>> {
667        GlobalThreadsDatabase::global(cx).0.clone()
668    }
669
670    fn init(cx: &mut App) {
671        let executor = cx.background_executor().clone();
672        let database_future = executor
673            .spawn({
674                let executor = executor.clone();
675                let database_path = paths::data_dir().join("threads/threads-db.1.mdb");
676                async move { ThreadsDatabase::new(database_path, executor) }
677            })
678            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
679            .boxed()
680            .shared();
681
682        cx.set_global(GlobalThreadsDatabase(database_future));
683    }
684
685    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
686        std::fs::create_dir_all(&path)?;
687
688        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
689        let env = unsafe {
690            heed::EnvOpenOptions::new()
691                .map_size(ONE_GB_IN_BYTES)
692                .max_dbs(1)
693                .open(path)?
694        };
695
696        let mut txn = env.write_txn()?;
697        let threads = env.create_database(&mut txn, Some("threads"))?;
698        txn.commit()?;
699
700        Ok(Self {
701            executor,
702            env,
703            threads,
704        })
705    }
706
707    pub fn list_threads(&self) -> Task<Result<Vec<SerializedThreadMetadata>>> {
708        let env = self.env.clone();
709        let threads = self.threads;
710
711        self.executor.spawn(async move {
712            let txn = env.read_txn()?;
713            let mut iter = threads.iter(&txn)?;
714            let mut threads = Vec::new();
715            while let Some((key, value)) = iter.next().transpose()? {
716                threads.push(SerializedThreadMetadata {
717                    id: key,
718                    summary: value.summary,
719                    updated_at: value.updated_at,
720                });
721            }
722
723            Ok(threads)
724        })
725    }
726
727    pub fn try_find_thread(&self, id: ThreadId) -> Task<Result<Option<SerializedThread>>> {
728        let env = self.env.clone();
729        let threads = self.threads;
730
731        self.executor.spawn(async move {
732            let txn = env.read_txn()?;
733            let thread = threads.get(&txn, &id)?;
734            Ok(thread)
735        })
736    }
737
738    pub fn save_thread(&self, id: ThreadId, thread: SerializedThread) -> Task<Result<()>> {
739        let env = self.env.clone();
740        let threads = self.threads;
741
742        self.executor.spawn(async move {
743            let mut txn = env.write_txn()?;
744            threads.put(&mut txn, &id, &thread)?;
745            txn.commit()?;
746            Ok(())
747        })
748    }
749
750    pub fn delete_thread(&self, id: ThreadId) -> Task<Result<()>> {
751        let env = self.env.clone();
752        let threads = self.threads;
753
754        self.executor.spawn(async move {
755            let mut txn = env.write_txn()?;
756            threads.delete(&mut txn, &id)?;
757            txn.commit()?;
758            Ok(())
759        })
760    }
761}