thread_store.rs

  1use std::borrow::Cow;
  2use std::cell::{Ref, RefCell};
  3use std::path::{Path, PathBuf};
  4use std::rc::Rc;
  5use std::sync::Arc;
  6
  7use anyhow::{Context as _, Result, anyhow};
  8use assistant_settings::{AgentProfile, AgentProfileId, AssistantSettings};
  9use assistant_tool::{ToolId, ToolSource, ToolWorkingSet};
 10use chrono::{DateTime, Utc};
 11use collections::HashMap;
 12use context_server::manager::ContextServerManager;
 13use context_server::{ContextServerFactoryRegistry, ContextServerTool};
 14use fs::Fs;
 15use futures::FutureExt as _;
 16use futures::future::{self, BoxFuture, Shared};
 17use gpui::{
 18    App, BackgroundExecutor, Context, Entity, EventEmitter, Global, ReadGlobal, SharedString,
 19    Subscription, Task, prelude::*,
 20};
 21use heed::Database;
 22use heed::types::SerdeBincode;
 23use language_model::{LanguageModelToolUseId, Role, TokenUsage};
 24use project::{Project, Worktree};
 25use prompt_store::{ProjectContext, PromptBuilder, RulesFileContext, WorktreeContext};
 26use serde::{Deserialize, Serialize};
 27use settings::{Settings as _, SettingsStore};
 28use util::ResultExt as _;
 29
 30use crate::thread::{
 31    DetailedSummaryState, ExceededWindowError, MessageId, ProjectSnapshot, Thread, ThreadId,
 32};
 33
 34const RULES_FILE_NAMES: [&'static str; 6] = [
 35    ".rules",
 36    ".cursorrules",
 37    ".windsurfrules",
 38    ".clinerules",
 39    ".github/copilot-instructions.md",
 40    "CLAUDE.md",
 41];
 42
 43pub fn init(cx: &mut App) {
 44    ThreadsDatabase::init(cx);
 45}
 46
 47/// A system prompt shared by all threads created by this ThreadStore
 48#[derive(Clone, Default)]
 49pub struct SharedProjectContext(Rc<RefCell<Option<ProjectContext>>>);
 50
 51impl SharedProjectContext {
 52    pub fn borrow(&self) -> Ref<Option<ProjectContext>> {
 53        self.0.borrow()
 54    }
 55}
 56
 57pub struct ThreadStore {
 58    project: Entity<Project>,
 59    tools: Entity<ToolWorkingSet>,
 60    prompt_builder: Arc<PromptBuilder>,
 61    context_server_manager: Entity<ContextServerManager>,
 62    context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
 63    threads: Vec<SerializedThreadMetadata>,
 64    project_context: SharedProjectContext,
 65    _subscriptions: Vec<Subscription>,
 66}
 67
 68pub struct RulesLoadingError {
 69    pub message: SharedString,
 70}
 71
 72impl EventEmitter<RulesLoadingError> for ThreadStore {}
 73
 74impl ThreadStore {
 75    pub fn load(
 76        project: Entity<Project>,
 77        tools: Entity<ToolWorkingSet>,
 78        prompt_builder: Arc<PromptBuilder>,
 79        cx: &mut App,
 80    ) -> Task<Entity<Self>> {
 81        let thread_store = cx.new(|cx| Self::new(project, tools, prompt_builder, cx));
 82        let reload = thread_store.update(cx, |store, cx| store.reload_system_prompt(cx));
 83        cx.foreground_executor().spawn(async move {
 84            reload.await;
 85            thread_store
 86        })
 87    }
 88
 89    fn new(
 90        project: Entity<Project>,
 91        tools: Entity<ToolWorkingSet>,
 92        prompt_builder: Arc<PromptBuilder>,
 93        cx: &mut Context<Self>,
 94    ) -> Self {
 95        let context_server_factory_registry = ContextServerFactoryRegistry::default_global(cx);
 96        let context_server_manager = cx.new(|cx| {
 97            ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
 98        });
 99        let settings_subscription =
100            cx.observe_global::<SettingsStore>(move |this: &mut Self, cx| {
101                this.load_default_profile(cx);
102            });
103        let project_subscription = cx.subscribe(&project, Self::handle_project_event);
104
105        let this = Self {
106            project,
107            tools,
108            prompt_builder,
109            context_server_manager,
110            context_server_tool_ids: HashMap::default(),
111            threads: Vec::new(),
112            project_context: SharedProjectContext::default(),
113            _subscriptions: vec![settings_subscription, project_subscription],
114        };
115        this.load_default_profile(cx);
116        this.register_context_server_handlers(cx);
117        this.reload(cx).detach_and_log_err(cx);
118        this
119    }
120
121    fn handle_project_event(
122        &mut self,
123        _project: Entity<Project>,
124        event: &project::Event,
125        cx: &mut Context<Self>,
126    ) {
127        match event {
128            project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
129                self.reload_system_prompt(cx).detach();
130            }
131            project::Event::WorktreeUpdatedEntries(_, items) => {
132                if items.iter().any(|(path, _, _)| {
133                    RULES_FILE_NAMES
134                        .iter()
135                        .any(|name| path.as_ref() == Path::new(name))
136                }) {
137                    self.reload_system_prompt(cx).detach();
138                }
139            }
140            _ => {}
141        }
142    }
143
144    pub fn reload_system_prompt(&self, cx: &mut Context<Self>) -> Task<()> {
145        let project = self.project.read(cx);
146        let tasks = project
147            .visible_worktrees(cx)
148            .map(|worktree| {
149                Self::load_worktree_info_for_system_prompt(
150                    project.fs().clone(),
151                    worktree.read(cx),
152                    cx,
153                )
154            })
155            .collect::<Vec<_>>();
156
157        cx.spawn(async move |this, cx| {
158            let results = futures::future::join_all(tasks).await;
159            let worktrees = results
160                .into_iter()
161                .map(|(worktree, rules_error)| {
162                    if let Some(rules_error) = rules_error {
163                        this.update(cx, |_, cx| cx.emit(rules_error)).ok();
164                    }
165                    worktree
166                })
167                .collect::<Vec<_>>();
168            this.update(cx, |this, _cx| {
169                *this.project_context.0.borrow_mut() = Some(ProjectContext::new(worktrees));
170            })
171            .ok();
172        })
173    }
174
175    fn load_worktree_info_for_system_prompt(
176        fs: Arc<dyn Fs>,
177        worktree: &Worktree,
178        cx: &App,
179    ) -> Task<(WorktreeContext, Option<RulesLoadingError>)> {
180        let root_name = worktree.root_name().into();
181        let abs_path = worktree.abs_path();
182
183        let rules_task = Self::load_worktree_rules_file(fs, worktree, cx);
184        let Some(rules_task) = rules_task else {
185            return Task::ready((
186                WorktreeContext {
187                    root_name,
188                    abs_path,
189                    rules_file: None,
190                },
191                None,
192            ));
193        };
194
195        cx.spawn(async move |_| {
196            let (rules_file, rules_file_error) = match rules_task.await {
197                Ok(rules_file) => (Some(rules_file), None),
198                Err(err) => (
199                    None,
200                    Some(RulesLoadingError {
201                        message: format!("{err}").into(),
202                    }),
203                ),
204            };
205            let worktree_info = WorktreeContext {
206                root_name,
207                abs_path,
208                rules_file,
209            };
210            (worktree_info, rules_file_error)
211        })
212    }
213
214    fn load_worktree_rules_file(
215        fs: Arc<dyn Fs>,
216        worktree: &Worktree,
217        cx: &App,
218    ) -> Option<Task<Result<RulesFileContext>>> {
219        let selected_rules_file = RULES_FILE_NAMES
220            .into_iter()
221            .filter_map(|name| {
222                worktree
223                    .entry_for_path(name)
224                    .filter(|entry| entry.is_file())
225                    .map(|entry| (entry.path.clone(), worktree.absolutize(&entry.path)))
226            })
227            .next();
228
229        // Note that Cline supports `.clinerules` being a directory, but that is not currently
230        // supported. This doesn't seem to occur often in GitHub repositories.
231        selected_rules_file.map(|(path_in_worktree, abs_path)| {
232            let fs = fs.clone();
233            cx.background_spawn(async move {
234                let abs_path = abs_path?;
235                let text = fs.load(&abs_path).await.with_context(|| {
236                    format!("Failed to load assistant rules file {:?}", abs_path)
237                })?;
238                anyhow::Ok(RulesFileContext {
239                    path_in_worktree,
240                    abs_path: abs_path.into(),
241                    text: text.trim().to_string(),
242                })
243            })
244        })
245    }
246
247    pub fn context_server_manager(&self) -> Entity<ContextServerManager> {
248        self.context_server_manager.clone()
249    }
250
251    pub fn tools(&self) -> Entity<ToolWorkingSet> {
252        self.tools.clone()
253    }
254
255    /// Returns the number of threads.
256    pub fn thread_count(&self) -> usize {
257        self.threads.len()
258    }
259
260    pub fn threads(&self) -> Vec<SerializedThreadMetadata> {
261        let mut threads = self.threads.iter().cloned().collect::<Vec<_>>();
262        threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.updated_at));
263        threads
264    }
265
266    pub fn recent_threads(&self, limit: usize) -> Vec<SerializedThreadMetadata> {
267        self.threads().into_iter().take(limit).collect()
268    }
269
270    pub fn create_thread(&mut self, cx: &mut Context<Self>) -> Entity<Thread> {
271        cx.new(|cx| {
272            Thread::new(
273                self.project.clone(),
274                self.tools.clone(),
275                self.prompt_builder.clone(),
276                self.project_context.clone(),
277                cx,
278            )
279        })
280    }
281
282    pub fn open_thread(
283        &self,
284        id: &ThreadId,
285        cx: &mut Context<Self>,
286    ) -> Task<Result<Entity<Thread>>> {
287        let id = id.clone();
288        let database_future = ThreadsDatabase::global_future(cx);
289        cx.spawn(async move |this, cx| {
290            let database = database_future.await.map_err(|err| anyhow!(err))?;
291            let thread = database
292                .try_find_thread(id.clone())
293                .await?
294                .ok_or_else(|| anyhow!("no thread found with ID: {id:?}"))?;
295
296            let thread = this.update(cx, |this, cx| {
297                cx.new(|cx| {
298                    Thread::deserialize(
299                        id.clone(),
300                        thread,
301                        this.project.clone(),
302                        this.tools.clone(),
303                        this.prompt_builder.clone(),
304                        this.project_context.clone(),
305                        cx,
306                    )
307                })
308            })?;
309
310            Ok(thread)
311        })
312    }
313
314    pub fn save_thread(&self, thread: &Entity<Thread>, cx: &mut Context<Self>) -> Task<Result<()>> {
315        let (metadata, serialized_thread) =
316            thread.update(cx, |thread, cx| (thread.id().clone(), thread.serialize(cx)));
317
318        let database_future = ThreadsDatabase::global_future(cx);
319        cx.spawn(async move |this, cx| {
320            let serialized_thread = serialized_thread.await?;
321            let database = database_future.await.map_err(|err| anyhow!(err))?;
322            database.save_thread(metadata, serialized_thread).await?;
323
324            this.update(cx, |this, cx| this.reload(cx))?.await
325        })
326    }
327
328    pub fn delete_thread(&mut self, id: &ThreadId, cx: &mut Context<Self>) -> Task<Result<()>> {
329        let id = id.clone();
330        let database_future = ThreadsDatabase::global_future(cx);
331        cx.spawn(async move |this, cx| {
332            let database = database_future.await.map_err(|err| anyhow!(err))?;
333            database.delete_thread(id.clone()).await?;
334
335            this.update(cx, |this, cx| {
336                this.threads.retain(|thread| thread.id != id);
337                cx.notify();
338            })
339        })
340    }
341
342    pub fn reload(&self, cx: &mut Context<Self>) -> Task<Result<()>> {
343        let database_future = ThreadsDatabase::global_future(cx);
344        cx.spawn(async move |this, cx| {
345            let threads = database_future
346                .await
347                .map_err(|err| anyhow!(err))?
348                .list_threads()
349                .await?;
350
351            this.update(cx, |this, cx| {
352                this.threads = threads;
353                cx.notify();
354            })
355        })
356    }
357
358    fn load_default_profile(&self, cx: &mut Context<Self>) {
359        let assistant_settings = AssistantSettings::get_global(cx);
360
361        self.load_profile_by_id(assistant_settings.default_profile.clone(), cx);
362    }
363
364    pub fn load_profile_by_id(&self, profile_id: AgentProfileId, cx: &mut Context<Self>) {
365        let assistant_settings = AssistantSettings::get_global(cx);
366
367        if let Some(profile) = assistant_settings.profiles.get(&profile_id) {
368            self.load_profile(profile.clone(), cx);
369        }
370    }
371
372    pub fn load_profile(&self, profile: AgentProfile, cx: &mut Context<Self>) {
373        self.tools.update(cx, |tools, cx| {
374            tools.disable_all_tools(cx);
375            tools.enable(
376                ToolSource::Native,
377                &profile
378                    .tools
379                    .iter()
380                    .filter_map(|(tool, enabled)| enabled.then(|| tool.clone()))
381                    .collect::<Vec<_>>(),
382                cx,
383            );
384        });
385
386        if profile.enable_all_context_servers {
387            for context_server in self.context_server_manager.read(cx).all_servers() {
388                self.tools.update(cx, |tools, cx| {
389                    tools.enable_source(
390                        ToolSource::ContextServer {
391                            id: context_server.id().into(),
392                        },
393                        cx,
394                    );
395                });
396            }
397        } else {
398            for (context_server_id, preset) in &profile.context_servers {
399                self.tools.update(cx, |tools, cx| {
400                    tools.enable(
401                        ToolSource::ContextServer {
402                            id: context_server_id.clone().into(),
403                        },
404                        &preset
405                            .tools
406                            .iter()
407                            .filter_map(|(tool, enabled)| enabled.then(|| tool.clone()))
408                            .collect::<Vec<_>>(),
409                        cx,
410                    )
411                })
412            }
413        }
414    }
415
416    fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
417        cx.subscribe(
418            &self.context_server_manager.clone(),
419            Self::handle_context_server_event,
420        )
421        .detach();
422    }
423
424    fn handle_context_server_event(
425        &mut self,
426        context_server_manager: Entity<ContextServerManager>,
427        event: &context_server::manager::Event,
428        cx: &mut Context<Self>,
429    ) {
430        let tool_working_set = self.tools.clone();
431        match event {
432            context_server::manager::Event::ServerStarted { server_id } => {
433                if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
434                    let context_server_manager = context_server_manager.clone();
435                    cx.spawn({
436                        let server = server.clone();
437                        let server_id = server_id.clone();
438                        async move |this, cx| {
439                            let Some(protocol) = server.client() else {
440                                return;
441                            };
442
443                            if protocol.capable(context_server::protocol::ServerCapability::Tools) {
444                                if let Some(tools) = protocol.list_tools().await.log_err() {
445                                    let tool_ids = tool_working_set
446                                        .update(cx, |tool_working_set, _| {
447                                            tools
448                                                .tools
449                                                .into_iter()
450                                                .map(|tool| {
451                                                    log::info!(
452                                                        "registering context server tool: {:?}",
453                                                        tool.name
454                                                    );
455                                                    tool_working_set.insert(Arc::new(
456                                                        ContextServerTool::new(
457                                                            context_server_manager.clone(),
458                                                            server.id(),
459                                                            tool,
460                                                        ),
461                                                    ))
462                                                })
463                                                .collect::<Vec<_>>()
464                                        })
465                                        .log_err();
466
467                                    if let Some(tool_ids) = tool_ids {
468                                        this.update(cx, |this, cx| {
469                                            this.context_server_tool_ids
470                                                .insert(server_id, tool_ids);
471                                            this.load_default_profile(cx);
472                                        })
473                                        .log_err();
474                                    }
475                                }
476                            }
477                        }
478                    })
479                    .detach();
480                }
481            }
482            context_server::manager::Event::ServerStopped { server_id } => {
483                if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
484                    tool_working_set.update(cx, |tool_working_set, _| {
485                        tool_working_set.remove(&tool_ids);
486                    });
487                    self.load_default_profile(cx);
488                }
489            }
490        }
491    }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct SerializedThreadMetadata {
496    pub id: ThreadId,
497    pub summary: SharedString,
498    pub updated_at: DateTime<Utc>,
499}
500
501#[derive(Serialize, Deserialize, Debug)]
502pub struct SerializedThread {
503    pub version: String,
504    pub summary: SharedString,
505    pub updated_at: DateTime<Utc>,
506    pub messages: Vec<SerializedMessage>,
507    #[serde(default)]
508    pub initial_project_snapshot: Option<Arc<ProjectSnapshot>>,
509    #[serde(default)]
510    pub cumulative_token_usage: TokenUsage,
511    #[serde(default)]
512    pub detailed_summary_state: DetailedSummaryState,
513    #[serde(default)]
514    pub exceeded_window_error: Option<ExceededWindowError>,
515}
516
517impl SerializedThread {
518    pub const VERSION: &'static str = "0.1.0";
519
520    pub fn from_json(json: &[u8]) -> Result<Self> {
521        let saved_thread_json = serde_json::from_slice::<serde_json::Value>(json)?;
522        match saved_thread_json.get("version") {
523            Some(serde_json::Value::String(version)) => match version.as_str() {
524                SerializedThread::VERSION => Ok(serde_json::from_value::<SerializedThread>(
525                    saved_thread_json,
526                )?),
527                _ => Err(anyhow!(
528                    "unrecognized serialized thread version: {}",
529                    version
530                )),
531            },
532            None => {
533                let saved_thread =
534                    serde_json::from_value::<LegacySerializedThread>(saved_thread_json)?;
535                Ok(saved_thread.upgrade())
536            }
537            version => Err(anyhow!(
538                "unrecognized serialized thread version: {:?}",
539                version
540            )),
541        }
542    }
543}
544
545#[derive(Debug, Serialize, Deserialize)]
546pub struct SerializedMessage {
547    pub id: MessageId,
548    pub role: Role,
549    #[serde(default)]
550    pub segments: Vec<SerializedMessageSegment>,
551    #[serde(default)]
552    pub tool_uses: Vec<SerializedToolUse>,
553    #[serde(default)]
554    pub tool_results: Vec<SerializedToolResult>,
555    #[serde(default)]
556    pub context: String,
557}
558
559#[derive(Debug, Serialize, Deserialize)]
560#[serde(tag = "type")]
561pub enum SerializedMessageSegment {
562    #[serde(rename = "text")]
563    Text { text: String },
564    #[serde(rename = "thinking")]
565    Thinking { text: String },
566}
567
568#[derive(Debug, Serialize, Deserialize)]
569pub struct SerializedToolUse {
570    pub id: LanguageModelToolUseId,
571    pub name: SharedString,
572    pub input: serde_json::Value,
573}
574
575#[derive(Debug, Serialize, Deserialize)]
576pub struct SerializedToolResult {
577    pub tool_use_id: LanguageModelToolUseId,
578    pub is_error: bool,
579    pub content: Arc<str>,
580}
581
582#[derive(Serialize, Deserialize)]
583struct LegacySerializedThread {
584    pub summary: SharedString,
585    pub updated_at: DateTime<Utc>,
586    pub messages: Vec<LegacySerializedMessage>,
587    #[serde(default)]
588    pub initial_project_snapshot: Option<Arc<ProjectSnapshot>>,
589}
590
591impl LegacySerializedThread {
592    pub fn upgrade(self) -> SerializedThread {
593        SerializedThread {
594            version: SerializedThread::VERSION.to_string(),
595            summary: self.summary,
596            updated_at: self.updated_at,
597            messages: self.messages.into_iter().map(|msg| msg.upgrade()).collect(),
598            initial_project_snapshot: self.initial_project_snapshot,
599            cumulative_token_usage: TokenUsage::default(),
600            detailed_summary_state: DetailedSummaryState::default(),
601            exceeded_window_error: None,
602        }
603    }
604}
605
606#[derive(Debug, Serialize, Deserialize)]
607struct LegacySerializedMessage {
608    pub id: MessageId,
609    pub role: Role,
610    pub text: String,
611    #[serde(default)]
612    pub tool_uses: Vec<SerializedToolUse>,
613    #[serde(default)]
614    pub tool_results: Vec<SerializedToolResult>,
615}
616
617impl LegacySerializedMessage {
618    fn upgrade(self) -> SerializedMessage {
619        SerializedMessage {
620            id: self.id,
621            role: self.role,
622            segments: vec![SerializedMessageSegment::Text { text: self.text }],
623            tool_uses: self.tool_uses,
624            tool_results: self.tool_results,
625            context: String::new(),
626        }
627    }
628}
629
630struct GlobalThreadsDatabase(
631    Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>>,
632);
633
634impl Global for GlobalThreadsDatabase {}
635
636pub(crate) struct ThreadsDatabase {
637    executor: BackgroundExecutor,
638    env: heed::Env,
639    threads: Database<SerdeBincode<ThreadId>, SerializedThread>,
640}
641
642impl heed::BytesEncode<'_> for SerializedThread {
643    type EItem = SerializedThread;
644
645    fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, heed::BoxedError> {
646        serde_json::to_vec(item).map(Cow::Owned).map_err(Into::into)
647    }
648}
649
650impl<'a> heed::BytesDecode<'a> for SerializedThread {
651    type DItem = SerializedThread;
652
653    fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, heed::BoxedError> {
654        // We implement this type manually because we want to call `SerializedThread::from_json`,
655        // instead of the Deserialize trait implementation for `SerializedThread`.
656        SerializedThread::from_json(bytes).map_err(Into::into)
657    }
658}
659
660impl ThreadsDatabase {
661    fn global_future(
662        cx: &mut App,
663    ) -> Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>> {
664        GlobalThreadsDatabase::global(cx).0.clone()
665    }
666
667    fn init(cx: &mut App) {
668        let executor = cx.background_executor().clone();
669        let database_future = executor
670            .spawn({
671                let executor = executor.clone();
672                let database_path = paths::data_dir().join("threads/threads-db.1.mdb");
673                async move { ThreadsDatabase::new(database_path, executor) }
674            })
675            .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
676            .boxed()
677            .shared();
678
679        cx.set_global(GlobalThreadsDatabase(database_future));
680    }
681
682    pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
683        std::fs::create_dir_all(&path)?;
684
685        const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
686        let env = unsafe {
687            heed::EnvOpenOptions::new()
688                .map_size(ONE_GB_IN_BYTES)
689                .max_dbs(1)
690                .open(path)?
691        };
692
693        let mut txn = env.write_txn()?;
694        let threads = env.create_database(&mut txn, Some("threads"))?;
695        txn.commit()?;
696
697        Ok(Self {
698            executor,
699            env,
700            threads,
701        })
702    }
703
704    pub fn list_threads(&self) -> Task<Result<Vec<SerializedThreadMetadata>>> {
705        let env = self.env.clone();
706        let threads = self.threads;
707
708        self.executor.spawn(async move {
709            let txn = env.read_txn()?;
710            let mut iter = threads.iter(&txn)?;
711            let mut threads = Vec::new();
712            while let Some((key, value)) = iter.next().transpose()? {
713                threads.push(SerializedThreadMetadata {
714                    id: key,
715                    summary: value.summary,
716                    updated_at: value.updated_at,
717                });
718            }
719
720            Ok(threads)
721        })
722    }
723
724    pub fn try_find_thread(&self, id: ThreadId) -> Task<Result<Option<SerializedThread>>> {
725        let env = self.env.clone();
726        let threads = self.threads;
727
728        self.executor.spawn(async move {
729            let txn = env.read_txn()?;
730            let thread = threads.get(&txn, &id)?;
731            Ok(thread)
732        })
733    }
734
735    pub fn save_thread(&self, id: ThreadId, thread: SerializedThread) -> Task<Result<()>> {
736        let env = self.env.clone();
737        let threads = self.threads;
738
739        self.executor.spawn(async move {
740            let mut txn = env.write_txn()?;
741            threads.put(&mut txn, &id, &thread)?;
742            txn.commit()?;
743            Ok(())
744        })
745    }
746
747    pub fn delete_thread(&self, id: ThreadId) -> Task<Result<()>> {
748        let env = self.env.clone();
749        let threads = self.threads;
750
751        self.executor.spawn(async move {
752            let mut txn = env.write_txn()?;
753            threads.delete(&mut txn, &id)?;
754            txn.commit()?;
755            Ok(())
756        })
757    }
758}