1use std::borrow::Cow;
2use std::cell::{Ref, RefCell};
3use std::path::{Path, PathBuf};
4use std::rc::Rc;
5use std::sync::Arc;
6
7use anyhow::{Context as _, Result, anyhow};
8use assistant_settings::{AgentProfile, AgentProfileId, AssistantSettings};
9use assistant_tool::{ToolId, ToolSource, ToolWorkingSet};
10use chrono::{DateTime, Utc};
11use collections::HashMap;
12use context_server::manager::ContextServerManager;
13use context_server::{ContextServerFactoryRegistry, ContextServerTool};
14use fs::Fs;
15use futures::FutureExt as _;
16use futures::future::{self, BoxFuture, Shared};
17use gpui::{
18 App, BackgroundExecutor, Context, Entity, EventEmitter, Global, ReadGlobal, SharedString,
19 Subscription, Task, prelude::*,
20};
21use heed::Database;
22use heed::types::SerdeBincode;
23use language_model::{LanguageModelToolUseId, Role, TokenUsage};
24use project::{Project, Worktree};
25use prompt_store::{ProjectContext, PromptBuilder, RulesFileContext, WorktreeContext};
26use serde::{Deserialize, Serialize};
27use settings::{Settings as _, SettingsStore};
28use util::ResultExt as _;
29
30use crate::thread::{
31 DetailedSummaryState, ExceededWindowError, MessageId, ProjectSnapshot, Thread, ThreadId,
32};
33
34const RULES_FILE_NAMES: [&'static str; 6] = [
35 ".rules",
36 ".cursorrules",
37 ".windsurfrules",
38 ".clinerules",
39 ".github/copilot-instructions.md",
40 "CLAUDE.md",
41];
42
43pub fn init(cx: &mut App) {
44 ThreadsDatabase::init(cx);
45}
46
47/// A system prompt shared by all threads created by this ThreadStore
48#[derive(Clone, Default)]
49pub struct SharedProjectContext(Rc<RefCell<Option<ProjectContext>>>);
50
51impl SharedProjectContext {
52 pub fn borrow(&self) -> Ref<Option<ProjectContext>> {
53 self.0.borrow()
54 }
55}
56
57pub struct ThreadStore {
58 project: Entity<Project>,
59 tools: Entity<ToolWorkingSet>,
60 prompt_builder: Arc<PromptBuilder>,
61 context_server_manager: Entity<ContextServerManager>,
62 context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
63 threads: Vec<SerializedThreadMetadata>,
64 project_context: SharedProjectContext,
65 _subscriptions: Vec<Subscription>,
66}
67
68pub struct RulesLoadingError {
69 pub message: SharedString,
70}
71
72impl EventEmitter<RulesLoadingError> for ThreadStore {}
73
74impl ThreadStore {
75 pub fn load(
76 project: Entity<Project>,
77 tools: Entity<ToolWorkingSet>,
78 prompt_builder: Arc<PromptBuilder>,
79 cx: &mut App,
80 ) -> Task<Entity<Self>> {
81 let thread_store = cx.new(|cx| Self::new(project, tools, prompt_builder, cx));
82 let reload = thread_store.update(cx, |store, cx| store.reload_system_prompt(cx));
83 cx.foreground_executor().spawn(async move {
84 reload.await;
85 thread_store
86 })
87 }
88
89 fn new(
90 project: Entity<Project>,
91 tools: Entity<ToolWorkingSet>,
92 prompt_builder: Arc<PromptBuilder>,
93 cx: &mut Context<Self>,
94 ) -> Self {
95 let context_server_factory_registry = ContextServerFactoryRegistry::default_global(cx);
96 let context_server_manager = cx.new(|cx| {
97 ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
98 });
99 let settings_subscription =
100 cx.observe_global::<SettingsStore>(move |this: &mut Self, cx| {
101 this.load_default_profile(cx);
102 });
103 let project_subscription = cx.subscribe(&project, Self::handle_project_event);
104
105 let this = Self {
106 project,
107 tools,
108 prompt_builder,
109 context_server_manager,
110 context_server_tool_ids: HashMap::default(),
111 threads: Vec::new(),
112 project_context: SharedProjectContext::default(),
113 _subscriptions: vec![settings_subscription, project_subscription],
114 };
115 this.load_default_profile(cx);
116 this.register_context_server_handlers(cx);
117 this.reload(cx).detach_and_log_err(cx);
118 this
119 }
120
121 fn handle_project_event(
122 &mut self,
123 _project: Entity<Project>,
124 event: &project::Event,
125 cx: &mut Context<Self>,
126 ) {
127 match event {
128 project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
129 self.reload_system_prompt(cx).detach();
130 }
131 project::Event::WorktreeUpdatedEntries(_, items) => {
132 if items.iter().any(|(path, _, _)| {
133 RULES_FILE_NAMES
134 .iter()
135 .any(|name| path.as_ref() == Path::new(name))
136 }) {
137 self.reload_system_prompt(cx).detach();
138 }
139 }
140 _ => {}
141 }
142 }
143
144 pub fn reload_system_prompt(&self, cx: &mut Context<Self>) -> Task<()> {
145 let project = self.project.read(cx);
146 let tasks = project
147 .visible_worktrees(cx)
148 .map(|worktree| {
149 Self::load_worktree_info_for_system_prompt(
150 project.fs().clone(),
151 worktree.read(cx),
152 cx,
153 )
154 })
155 .collect::<Vec<_>>();
156
157 cx.spawn(async move |this, cx| {
158 let results = futures::future::join_all(tasks).await;
159 let worktrees = results
160 .into_iter()
161 .map(|(worktree, rules_error)| {
162 if let Some(rules_error) = rules_error {
163 this.update(cx, |_, cx| cx.emit(rules_error)).ok();
164 }
165 worktree
166 })
167 .collect::<Vec<_>>();
168 this.update(cx, |this, _cx| {
169 *this.project_context.0.borrow_mut() = Some(ProjectContext::new(worktrees));
170 })
171 .ok();
172 })
173 }
174
175 fn load_worktree_info_for_system_prompt(
176 fs: Arc<dyn Fs>,
177 worktree: &Worktree,
178 cx: &App,
179 ) -> Task<(WorktreeContext, Option<RulesLoadingError>)> {
180 let root_name = worktree.root_name().into();
181 let abs_path = worktree.abs_path();
182
183 let rules_task = Self::load_worktree_rules_file(fs, worktree, cx);
184 let Some(rules_task) = rules_task else {
185 return Task::ready((
186 WorktreeContext {
187 root_name,
188 abs_path,
189 rules_file: None,
190 },
191 None,
192 ));
193 };
194
195 cx.spawn(async move |_| {
196 let (rules_file, rules_file_error) = match rules_task.await {
197 Ok(rules_file) => (Some(rules_file), None),
198 Err(err) => (
199 None,
200 Some(RulesLoadingError {
201 message: format!("{err}").into(),
202 }),
203 ),
204 };
205 let worktree_info = WorktreeContext {
206 root_name,
207 abs_path,
208 rules_file,
209 };
210 (worktree_info, rules_file_error)
211 })
212 }
213
214 fn load_worktree_rules_file(
215 fs: Arc<dyn Fs>,
216 worktree: &Worktree,
217 cx: &App,
218 ) -> Option<Task<Result<RulesFileContext>>> {
219 let selected_rules_file = RULES_FILE_NAMES
220 .into_iter()
221 .filter_map(|name| {
222 worktree
223 .entry_for_path(name)
224 .filter(|entry| entry.is_file())
225 .map(|entry| (entry.path.clone(), worktree.absolutize(&entry.path)))
226 })
227 .next();
228
229 // Note that Cline supports `.clinerules` being a directory, but that is not currently
230 // supported. This doesn't seem to occur often in GitHub repositories.
231 selected_rules_file.map(|(path_in_worktree, abs_path)| {
232 let fs = fs.clone();
233 cx.background_spawn(async move {
234 let abs_path = abs_path?;
235 let text = fs.load(&abs_path).await.with_context(|| {
236 format!("Failed to load assistant rules file {:?}", abs_path)
237 })?;
238 anyhow::Ok(RulesFileContext {
239 path_in_worktree,
240 abs_path: abs_path.into(),
241 text: text.trim().to_string(),
242 })
243 })
244 })
245 }
246
247 pub fn context_server_manager(&self) -> Entity<ContextServerManager> {
248 self.context_server_manager.clone()
249 }
250
251 pub fn tools(&self) -> Entity<ToolWorkingSet> {
252 self.tools.clone()
253 }
254
255 /// Returns the number of threads.
256 pub fn thread_count(&self) -> usize {
257 self.threads.len()
258 }
259
260 pub fn threads(&self) -> Vec<SerializedThreadMetadata> {
261 let mut threads = self.threads.iter().cloned().collect::<Vec<_>>();
262 threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.updated_at));
263 threads
264 }
265
266 pub fn recent_threads(&self, limit: usize) -> Vec<SerializedThreadMetadata> {
267 self.threads().into_iter().take(limit).collect()
268 }
269
270 pub fn create_thread(&mut self, cx: &mut Context<Self>) -> Entity<Thread> {
271 cx.new(|cx| {
272 Thread::new(
273 self.project.clone(),
274 self.tools.clone(),
275 self.prompt_builder.clone(),
276 self.project_context.clone(),
277 cx,
278 )
279 })
280 }
281
282 pub fn open_thread(
283 &self,
284 id: &ThreadId,
285 cx: &mut Context<Self>,
286 ) -> Task<Result<Entity<Thread>>> {
287 let id = id.clone();
288 let database_future = ThreadsDatabase::global_future(cx);
289 cx.spawn(async move |this, cx| {
290 let database = database_future.await.map_err(|err| anyhow!(err))?;
291 let thread = database
292 .try_find_thread(id.clone())
293 .await?
294 .ok_or_else(|| anyhow!("no thread found with ID: {id:?}"))?;
295
296 let thread = this.update(cx, |this, cx| {
297 cx.new(|cx| {
298 Thread::deserialize(
299 id.clone(),
300 thread,
301 this.project.clone(),
302 this.tools.clone(),
303 this.prompt_builder.clone(),
304 this.project_context.clone(),
305 cx,
306 )
307 })
308 })?;
309
310 Ok(thread)
311 })
312 }
313
314 pub fn save_thread(&self, thread: &Entity<Thread>, cx: &mut Context<Self>) -> Task<Result<()>> {
315 let (metadata, serialized_thread) =
316 thread.update(cx, |thread, cx| (thread.id().clone(), thread.serialize(cx)));
317
318 let database_future = ThreadsDatabase::global_future(cx);
319 cx.spawn(async move |this, cx| {
320 let serialized_thread = serialized_thread.await?;
321 let database = database_future.await.map_err(|err| anyhow!(err))?;
322 database.save_thread(metadata, serialized_thread).await?;
323
324 this.update(cx, |this, cx| this.reload(cx))?.await
325 })
326 }
327
328 pub fn delete_thread(&mut self, id: &ThreadId, cx: &mut Context<Self>) -> Task<Result<()>> {
329 let id = id.clone();
330 let database_future = ThreadsDatabase::global_future(cx);
331 cx.spawn(async move |this, cx| {
332 let database = database_future.await.map_err(|err| anyhow!(err))?;
333 database.delete_thread(id.clone()).await?;
334
335 this.update(cx, |this, cx| {
336 this.threads.retain(|thread| thread.id != id);
337 cx.notify();
338 })
339 })
340 }
341
342 pub fn reload(&self, cx: &mut Context<Self>) -> Task<Result<()>> {
343 let database_future = ThreadsDatabase::global_future(cx);
344 cx.spawn(async move |this, cx| {
345 let threads = database_future
346 .await
347 .map_err(|err| anyhow!(err))?
348 .list_threads()
349 .await?;
350
351 this.update(cx, |this, cx| {
352 this.threads = threads;
353 cx.notify();
354 })
355 })
356 }
357
358 fn load_default_profile(&self, cx: &mut Context<Self>) {
359 let assistant_settings = AssistantSettings::get_global(cx);
360
361 self.load_profile_by_id(assistant_settings.default_profile.clone(), cx);
362 }
363
364 pub fn load_profile_by_id(&self, profile_id: AgentProfileId, cx: &mut Context<Self>) {
365 let assistant_settings = AssistantSettings::get_global(cx);
366
367 if let Some(profile) = assistant_settings.profiles.get(&profile_id) {
368 self.load_profile(profile.clone(), cx);
369 }
370 }
371
372 pub fn load_profile(&self, profile: AgentProfile, cx: &mut Context<Self>) {
373 self.tools.update(cx, |tools, cx| {
374 tools.disable_all_tools(cx);
375 tools.enable(
376 ToolSource::Native,
377 &profile
378 .tools
379 .iter()
380 .filter_map(|(tool, enabled)| enabled.then(|| tool.clone()))
381 .collect::<Vec<_>>(),
382 cx,
383 );
384 });
385
386 if profile.enable_all_context_servers {
387 for context_server in self.context_server_manager.read(cx).all_servers() {
388 self.tools.update(cx, |tools, cx| {
389 tools.enable_source(
390 ToolSource::ContextServer {
391 id: context_server.id().into(),
392 },
393 cx,
394 );
395 });
396 }
397 } else {
398 for (context_server_id, preset) in &profile.context_servers {
399 self.tools.update(cx, |tools, cx| {
400 tools.enable(
401 ToolSource::ContextServer {
402 id: context_server_id.clone().into(),
403 },
404 &preset
405 .tools
406 .iter()
407 .filter_map(|(tool, enabled)| enabled.then(|| tool.clone()))
408 .collect::<Vec<_>>(),
409 cx,
410 )
411 })
412 }
413 }
414 }
415
416 fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
417 cx.subscribe(
418 &self.context_server_manager.clone(),
419 Self::handle_context_server_event,
420 )
421 .detach();
422 }
423
424 fn handle_context_server_event(
425 &mut self,
426 context_server_manager: Entity<ContextServerManager>,
427 event: &context_server::manager::Event,
428 cx: &mut Context<Self>,
429 ) {
430 let tool_working_set = self.tools.clone();
431 match event {
432 context_server::manager::Event::ServerStarted { server_id } => {
433 if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
434 let context_server_manager = context_server_manager.clone();
435 cx.spawn({
436 let server = server.clone();
437 let server_id = server_id.clone();
438 async move |this, cx| {
439 let Some(protocol) = server.client() else {
440 return;
441 };
442
443 if protocol.capable(context_server::protocol::ServerCapability::Tools) {
444 if let Some(tools) = protocol.list_tools().await.log_err() {
445 let tool_ids = tool_working_set
446 .update(cx, |tool_working_set, _| {
447 tools
448 .tools
449 .into_iter()
450 .map(|tool| {
451 log::info!(
452 "registering context server tool: {:?}",
453 tool.name
454 );
455 tool_working_set.insert(Arc::new(
456 ContextServerTool::new(
457 context_server_manager.clone(),
458 server.id(),
459 tool,
460 ),
461 ))
462 })
463 .collect::<Vec<_>>()
464 })
465 .log_err();
466
467 if let Some(tool_ids) = tool_ids {
468 this.update(cx, |this, cx| {
469 this.context_server_tool_ids
470 .insert(server_id, tool_ids);
471 this.load_default_profile(cx);
472 })
473 .log_err();
474 }
475 }
476 }
477 }
478 })
479 .detach();
480 }
481 }
482 context_server::manager::Event::ServerStopped { server_id } => {
483 if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
484 tool_working_set.update(cx, |tool_working_set, _| {
485 tool_working_set.remove(&tool_ids);
486 });
487 self.load_default_profile(cx);
488 }
489 }
490 }
491 }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct SerializedThreadMetadata {
496 pub id: ThreadId,
497 pub summary: SharedString,
498 pub updated_at: DateTime<Utc>,
499}
500
501#[derive(Serialize, Deserialize, Debug)]
502pub struct SerializedThread {
503 pub version: String,
504 pub summary: SharedString,
505 pub updated_at: DateTime<Utc>,
506 pub messages: Vec<SerializedMessage>,
507 #[serde(default)]
508 pub initial_project_snapshot: Option<Arc<ProjectSnapshot>>,
509 #[serde(default)]
510 pub cumulative_token_usage: TokenUsage,
511 #[serde(default)]
512 pub request_token_usage: Vec<TokenUsage>,
513 #[serde(default)]
514 pub detailed_summary_state: DetailedSummaryState,
515 #[serde(default)]
516 pub exceeded_window_error: Option<ExceededWindowError>,
517}
518
519impl SerializedThread {
520 pub const VERSION: &'static str = "0.1.0";
521
522 pub fn from_json(json: &[u8]) -> Result<Self> {
523 let saved_thread_json = serde_json::from_slice::<serde_json::Value>(json)?;
524 match saved_thread_json.get("version") {
525 Some(serde_json::Value::String(version)) => match version.as_str() {
526 SerializedThread::VERSION => Ok(serde_json::from_value::<SerializedThread>(
527 saved_thread_json,
528 )?),
529 _ => Err(anyhow!(
530 "unrecognized serialized thread version: {}",
531 version
532 )),
533 },
534 None => {
535 let saved_thread =
536 serde_json::from_value::<LegacySerializedThread>(saved_thread_json)?;
537 Ok(saved_thread.upgrade())
538 }
539 version => Err(anyhow!(
540 "unrecognized serialized thread version: {:?}",
541 version
542 )),
543 }
544 }
545}
546
547#[derive(Debug, Serialize, Deserialize)]
548pub struct SerializedMessage {
549 pub id: MessageId,
550 pub role: Role,
551 #[serde(default)]
552 pub segments: Vec<SerializedMessageSegment>,
553 #[serde(default)]
554 pub tool_uses: Vec<SerializedToolUse>,
555 #[serde(default)]
556 pub tool_results: Vec<SerializedToolResult>,
557 #[serde(default)]
558 pub context: String,
559}
560
561#[derive(Debug, Serialize, Deserialize)]
562#[serde(tag = "type")]
563pub enum SerializedMessageSegment {
564 #[serde(rename = "text")]
565 Text { text: String },
566 #[serde(rename = "thinking")]
567 Thinking { text: String },
568}
569
570#[derive(Debug, Serialize, Deserialize)]
571pub struct SerializedToolUse {
572 pub id: LanguageModelToolUseId,
573 pub name: SharedString,
574 pub input: serde_json::Value,
575}
576
577#[derive(Debug, Serialize, Deserialize)]
578pub struct SerializedToolResult {
579 pub tool_use_id: LanguageModelToolUseId,
580 pub is_error: bool,
581 pub content: Arc<str>,
582}
583
584#[derive(Serialize, Deserialize)]
585struct LegacySerializedThread {
586 pub summary: SharedString,
587 pub updated_at: DateTime<Utc>,
588 pub messages: Vec<LegacySerializedMessage>,
589 #[serde(default)]
590 pub initial_project_snapshot: Option<Arc<ProjectSnapshot>>,
591}
592
593impl LegacySerializedThread {
594 pub fn upgrade(self) -> SerializedThread {
595 SerializedThread {
596 version: SerializedThread::VERSION.to_string(),
597 summary: self.summary,
598 updated_at: self.updated_at,
599 messages: self.messages.into_iter().map(|msg| msg.upgrade()).collect(),
600 initial_project_snapshot: self.initial_project_snapshot,
601 cumulative_token_usage: TokenUsage::default(),
602 request_token_usage: Vec::new(),
603 detailed_summary_state: DetailedSummaryState::default(),
604 exceeded_window_error: None,
605 }
606 }
607}
608
609#[derive(Debug, Serialize, Deserialize)]
610struct LegacySerializedMessage {
611 pub id: MessageId,
612 pub role: Role,
613 pub text: String,
614 #[serde(default)]
615 pub tool_uses: Vec<SerializedToolUse>,
616 #[serde(default)]
617 pub tool_results: Vec<SerializedToolResult>,
618}
619
620impl LegacySerializedMessage {
621 fn upgrade(self) -> SerializedMessage {
622 SerializedMessage {
623 id: self.id,
624 role: self.role,
625 segments: vec![SerializedMessageSegment::Text { text: self.text }],
626 tool_uses: self.tool_uses,
627 tool_results: self.tool_results,
628 context: String::new(),
629 }
630 }
631}
632
633struct GlobalThreadsDatabase(
634 Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>>,
635);
636
637impl Global for GlobalThreadsDatabase {}
638
639pub(crate) struct ThreadsDatabase {
640 executor: BackgroundExecutor,
641 env: heed::Env,
642 threads: Database<SerdeBincode<ThreadId>, SerializedThread>,
643}
644
645impl heed::BytesEncode<'_> for SerializedThread {
646 type EItem = SerializedThread;
647
648 fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, heed::BoxedError> {
649 serde_json::to_vec(item).map(Cow::Owned).map_err(Into::into)
650 }
651}
652
653impl<'a> heed::BytesDecode<'a> for SerializedThread {
654 type DItem = SerializedThread;
655
656 fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, heed::BoxedError> {
657 // We implement this type manually because we want to call `SerializedThread::from_json`,
658 // instead of the Deserialize trait implementation for `SerializedThread`.
659 SerializedThread::from_json(bytes).map_err(Into::into)
660 }
661}
662
663impl ThreadsDatabase {
664 fn global_future(
665 cx: &mut App,
666 ) -> Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>> {
667 GlobalThreadsDatabase::global(cx).0.clone()
668 }
669
670 fn init(cx: &mut App) {
671 let executor = cx.background_executor().clone();
672 let database_future = executor
673 .spawn({
674 let executor = executor.clone();
675 let database_path = paths::data_dir().join("threads/threads-db.1.mdb");
676 async move { ThreadsDatabase::new(database_path, executor) }
677 })
678 .then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
679 .boxed()
680 .shared();
681
682 cx.set_global(GlobalThreadsDatabase(database_future));
683 }
684
685 pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
686 std::fs::create_dir_all(&path)?;
687
688 const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
689 let env = unsafe {
690 heed::EnvOpenOptions::new()
691 .map_size(ONE_GB_IN_BYTES)
692 .max_dbs(1)
693 .open(path)?
694 };
695
696 let mut txn = env.write_txn()?;
697 let threads = env.create_database(&mut txn, Some("threads"))?;
698 txn.commit()?;
699
700 Ok(Self {
701 executor,
702 env,
703 threads,
704 })
705 }
706
707 pub fn list_threads(&self) -> Task<Result<Vec<SerializedThreadMetadata>>> {
708 let env = self.env.clone();
709 let threads = self.threads;
710
711 self.executor.spawn(async move {
712 let txn = env.read_txn()?;
713 let mut iter = threads.iter(&txn)?;
714 let mut threads = Vec::new();
715 while let Some((key, value)) = iter.next().transpose()? {
716 threads.push(SerializedThreadMetadata {
717 id: key,
718 summary: value.summary,
719 updated_at: value.updated_at,
720 });
721 }
722
723 Ok(threads)
724 })
725 }
726
727 pub fn try_find_thread(&self, id: ThreadId) -> Task<Result<Option<SerializedThread>>> {
728 let env = self.env.clone();
729 let threads = self.threads;
730
731 self.executor.spawn(async move {
732 let txn = env.read_txn()?;
733 let thread = threads.get(&txn, &id)?;
734 Ok(thread)
735 })
736 }
737
738 pub fn save_thread(&self, id: ThreadId, thread: SerializedThread) -> Task<Result<()>> {
739 let env = self.env.clone();
740 let threads = self.threads;
741
742 self.executor.spawn(async move {
743 let mut txn = env.write_txn()?;
744 threads.put(&mut txn, &id, &thread)?;
745 txn.commit()?;
746 Ok(())
747 })
748 }
749
750 pub fn delete_thread(&self, id: ThreadId) -> Task<Result<()>> {
751 let env = self.env.clone();
752 let threads = self.threads;
753
754 self.executor.spawn(async move {
755 let mut txn = env.write_txn()?;
756 threads.delete(&mut txn, &id)?;
757 txn.commit()?;
758 Ok(())
759 })
760 }
761}