history_store.rs

  1use crate::{DbThreadMetadata, ThreadsDatabase};
  2use acp_thread::MentionUri;
  3use agent_client_protocol as acp;
  4use anyhow::{Context as _, Result, anyhow};
  5use assistant_context::{AssistantContext, SavedContextMetadata};
  6use chrono::{DateTime, Utc};
  7use db::kvp::KEY_VALUE_STORE;
  8use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
  9use itertools::Itertools;
 10use paths::contexts_dir;
 11use serde::{Deserialize, Serialize};
 12use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
 13use util::ResultExt as _;
 14
 15const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
 16const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
 17const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
 18
 19const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
 20
 21#[derive(Clone, Debug)]
 22pub enum HistoryEntry {
 23    AcpThread(DbThreadMetadata),
 24    TextThread(SavedContextMetadata),
 25}
 26
 27impl HistoryEntry {
 28    pub fn updated_at(&self) -> DateTime<Utc> {
 29        match self {
 30            HistoryEntry::AcpThread(thread) => thread.updated_at,
 31            HistoryEntry::TextThread(context) => context.mtime.to_utc(),
 32        }
 33    }
 34
 35    pub fn id(&self) -> HistoryEntryId {
 36        match self {
 37            HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
 38            HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
 39        }
 40    }
 41
 42    pub fn mention_uri(&self) -> MentionUri {
 43        match self {
 44            HistoryEntry::AcpThread(thread) => MentionUri::Thread {
 45                id: thread.id.clone(),
 46                name: thread.title.to_string(),
 47            },
 48            HistoryEntry::TextThread(context) => MentionUri::TextThread {
 49                path: context.path.as_ref().to_owned(),
 50                name: context.title.to_string(),
 51            },
 52        }
 53    }
 54
 55    pub fn title(&self) -> &SharedString {
 56        match self {
 57            HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
 58            HistoryEntry::AcpThread(thread) => &thread.title,
 59            HistoryEntry::TextThread(context) => &context.title,
 60        }
 61    }
 62}
 63
 64/// Generic identifier for a history entry.
 65#[derive(Clone, PartialEq, Eq, Debug, Hash)]
 66pub enum HistoryEntryId {
 67    AcpThread(acp::SessionId),
 68    TextThread(Arc<Path>),
 69}
 70
 71#[derive(Serialize, Deserialize, Debug)]
 72enum SerializedRecentOpen {
 73    AcpThread(String),
 74    TextThread(String),
 75}
 76
 77pub struct HistoryStore {
 78    threads: Vec<DbThreadMetadata>,
 79    context_store: Entity<assistant_context::ContextStore>,
 80    recently_opened_entries: VecDeque<HistoryEntryId>,
 81    _subscriptions: Vec<gpui::Subscription>,
 82    _save_recently_opened_entries_task: Task<()>,
 83}
 84
 85impl HistoryStore {
 86    pub fn new(
 87        context_store: Entity<assistant_context::ContextStore>,
 88        cx: &mut Context<Self>,
 89    ) -> Self {
 90        let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
 91
 92        cx.spawn(async move |this, cx| {
 93            let entries = Self::load_recently_opened_entries(cx).await;
 94            this.update(cx, |this, cx| {
 95                if let Some(entries) = entries.log_err() {
 96                    this.recently_opened_entries = entries;
 97                }
 98
 99                this.reload(cx);
100            })
101            .ok();
102        })
103        .detach();
104
105        Self {
106            context_store,
107            recently_opened_entries: VecDeque::default(),
108            threads: Vec::default(),
109            _subscriptions: subscriptions,
110            _save_recently_opened_entries_task: Task::ready(()),
111        }
112    }
113
114    pub fn thread_from_session_id(&self, session_id: &acp::SessionId) -> Option<&DbThreadMetadata> {
115        self.threads.iter().find(|thread| &thread.id == session_id)
116    }
117
118    pub fn delete_thread(
119        &mut self,
120        id: acp::SessionId,
121        cx: &mut Context<Self>,
122    ) -> Task<Result<()>> {
123        let database_future = ThreadsDatabase::connect(cx);
124        cx.spawn(async move |this, cx| {
125            let database = database_future.await.map_err(|err| anyhow!(err))?;
126            database.delete_thread(id.clone()).await?;
127            this.update(cx, |this, cx| this.reload(cx))
128        })
129    }
130
131    pub fn delete_text_thread(
132        &mut self,
133        path: Arc<Path>,
134        cx: &mut Context<Self>,
135    ) -> Task<Result<()>> {
136        self.context_store.update(cx, |context_store, cx| {
137            context_store.delete_local_context(path, cx)
138        })
139    }
140
141    pub fn load_text_thread(
142        &self,
143        path: Arc<Path>,
144        cx: &mut Context<Self>,
145    ) -> Task<Result<Entity<AssistantContext>>> {
146        self.context_store.update(cx, |context_store, cx| {
147            context_store.open_local_context(path, cx)
148        })
149    }
150
151    pub fn reload(&self, cx: &mut Context<Self>) {
152        let database_future = ThreadsDatabase::connect(cx);
153        cx.spawn(async move |this, cx| {
154            let threads = database_future
155                .await
156                .map_err(|err| anyhow!(err))?
157                .list_threads()
158                .await?;
159
160            this.update(cx, |this, cx| {
161                if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
162                    for thread in threads
163                        .iter()
164                        .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
165                        .rev()
166                    {
167                        this.push_recently_opened_entry(
168                            HistoryEntryId::AcpThread(thread.id.clone()),
169                            cx,
170                        )
171                    }
172                }
173                this.threads = threads;
174                cx.notify();
175            })
176        })
177        .detach_and_log_err(cx);
178    }
179
180    pub fn entries(&self, cx: &App) -> Vec<HistoryEntry> {
181        let mut history_entries = Vec::new();
182
183        #[cfg(debug_assertions)]
184        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
185            return history_entries;
186        }
187
188        history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
189        history_entries.extend(
190            self.context_store
191                .read(cx)
192                .unordered_contexts()
193                .cloned()
194                .map(HistoryEntry::TextThread),
195        );
196
197        history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
198        history_entries
199    }
200
201    pub fn is_empty(&self, cx: &App) -> bool {
202        self.threads.is_empty()
203            && self
204                .context_store
205                .read(cx)
206                .unordered_contexts()
207                .next()
208                .is_none()
209    }
210
211    pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
212        #[cfg(debug_assertions)]
213        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
214            return Vec::new();
215        }
216
217        let thread_entries = self.threads.iter().flat_map(|thread| {
218            self.recently_opened_entries
219                .iter()
220                .enumerate()
221                .flat_map(|(index, entry)| match entry {
222                    HistoryEntryId::AcpThread(id) if &thread.id == id => {
223                        Some((index, HistoryEntry::AcpThread(thread.clone())))
224                    }
225                    _ => None,
226                })
227        });
228
229        let context_entries =
230            self.context_store
231                .read(cx)
232                .unordered_contexts()
233                .flat_map(|context| {
234                    self.recently_opened_entries
235                        .iter()
236                        .enumerate()
237                        .flat_map(|(index, entry)| match entry {
238                            HistoryEntryId::TextThread(path) if &context.path == path => {
239                                Some((index, HistoryEntry::TextThread(context.clone())))
240                            }
241                            _ => None,
242                        })
243                });
244
245        thread_entries
246            .chain(context_entries)
247            // optimization to halt iteration early
248            .take(self.recently_opened_entries.len())
249            .sorted_unstable_by_key(|(index, _)| *index)
250            .map(|(_, entry)| entry)
251            .collect()
252    }
253
254    fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
255        let serialized_entries = self
256            .recently_opened_entries
257            .iter()
258            .filter_map(|entry| match entry {
259                HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
260                    SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
261                }),
262                HistoryEntryId::AcpThread(id) => {
263                    Some(SerializedRecentOpen::AcpThread(id.to_string()))
264                }
265            })
266            .collect::<Vec<_>>();
267
268        self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
269            let content = serde_json::to_string(&serialized_entries).unwrap();
270            cx.background_executor()
271                .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
272                .await;
273
274            if cfg!(any(feature = "test-support", test)) {
275                return;
276            }
277            KEY_VALUE_STORE
278                .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
279                .await
280                .log_err();
281        });
282    }
283
284    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
285        cx.background_spawn(async move {
286            if cfg!(any(feature = "test-support", test)) {
287                anyhow::bail!("history store does not persist in tests");
288            }
289            let json = KEY_VALUE_STORE
290                .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
291                .unwrap_or("[]".to_string());
292            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
293                .context("deserializing persisted agent panel navigation history")?
294                .into_iter()
295                .take(MAX_RECENTLY_OPENED_ENTRIES)
296                .flat_map(|entry| match entry {
297                    SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
298                        acp::SessionId(id.as_str().into()),
299                    )),
300                    SerializedRecentOpen::TextThread(file_name) => Some(
301                        HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
302                    ),
303                })
304                .collect();
305            Ok(entries)
306        })
307    }
308
309    pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
310        self.recently_opened_entries
311            .retain(|old_entry| old_entry != &entry);
312        self.recently_opened_entries.push_front(entry);
313        self.recently_opened_entries
314            .truncate(MAX_RECENTLY_OPENED_ENTRIES);
315        self.save_recently_opened_entries(cx);
316    }
317
318    pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
319        self.recently_opened_entries.retain(
320            |entry| !matches!(entry, HistoryEntryId::AcpThread(thread_id) if thread_id == &id),
321        );
322        self.save_recently_opened_entries(cx);
323    }
324
325    pub fn replace_recently_opened_text_thread(
326        &mut self,
327        old_path: &Path,
328        new_path: &Arc<Path>,
329        cx: &mut Context<Self>,
330    ) {
331        for entry in &mut self.recently_opened_entries {
332            match entry {
333                HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
334                    *entry = HistoryEntryId::TextThread(new_path.clone());
335                    break;
336                }
337                _ => {}
338            }
339        }
340        self.save_recently_opened_entries(cx);
341    }
342
343    pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
344        self.recently_opened_entries
345            .retain(|old_entry| old_entry != entry);
346        self.save_recently_opened_entries(cx);
347    }
348}