history_store.rs

  1use crate::{DbThreadMetadata, ThreadsDatabase};
  2use acp_thread::MentionUri;
  3use agent_client_protocol as acp;
  4use anyhow::{Context as _, Result, anyhow};
  5use assistant_context::{AssistantContext, SavedContextMetadata};
  6use chrono::{DateTime, Utc};
  7use db::kvp::KEY_VALUE_STORE;
  8use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
  9use itertools::Itertools;
 10use paths::contexts_dir;
 11use serde::{Deserialize, Serialize};
 12use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
 13use ui::ElementId;
 14use util::ResultExt as _;
 15
 16const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
 17const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
 18const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
 19
 20const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
 21
 22#[derive(Clone, Debug)]
 23pub enum HistoryEntry {
 24    AcpThread(DbThreadMetadata),
 25    TextThread(SavedContextMetadata),
 26}
 27
 28impl HistoryEntry {
 29    pub fn updated_at(&self) -> DateTime<Utc> {
 30        match self {
 31            HistoryEntry::AcpThread(thread) => thread.updated_at,
 32            HistoryEntry::TextThread(context) => context.mtime.to_utc(),
 33        }
 34    }
 35
 36    pub fn id(&self) -> HistoryEntryId {
 37        match self {
 38            HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
 39            HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
 40        }
 41    }
 42
 43    pub fn mention_uri(&self) -> MentionUri {
 44        match self {
 45            HistoryEntry::AcpThread(thread) => MentionUri::Thread {
 46                id: thread.id.clone(),
 47                name: thread.title.to_string(),
 48            },
 49            HistoryEntry::TextThread(context) => MentionUri::TextThread {
 50                path: context.path.as_ref().to_owned(),
 51                name: context.title.to_string(),
 52            },
 53        }
 54    }
 55
 56    pub fn title(&self) -> &SharedString {
 57        match self {
 58            HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
 59            HistoryEntry::AcpThread(thread) => &thread.title,
 60            HistoryEntry::TextThread(context) => &context.title,
 61        }
 62    }
 63}
 64
 65/// Generic identifier for a history entry.
 66#[derive(Clone, PartialEq, Eq, Debug, Hash)]
 67pub enum HistoryEntryId {
 68    AcpThread(acp::SessionId),
 69    TextThread(Arc<Path>),
 70}
 71
 72impl Into<ElementId> for HistoryEntryId {
 73    fn into(self) -> ElementId {
 74        match self {
 75            HistoryEntryId::AcpThread(session_id) => ElementId::Name(session_id.0.into()),
 76            HistoryEntryId::TextThread(path) => ElementId::Path(path),
 77        }
 78    }
 79}
 80
 81#[derive(Serialize, Deserialize, Debug)]
 82enum SerializedRecentOpen {
 83    AcpThread(String),
 84    TextThread(String),
 85}
 86
 87pub struct HistoryStore {
 88    threads: Vec<DbThreadMetadata>,
 89    context_store: Entity<assistant_context::ContextStore>,
 90    recently_opened_entries: VecDeque<HistoryEntryId>,
 91    _subscriptions: Vec<gpui::Subscription>,
 92    _save_recently_opened_entries_task: Task<()>,
 93}
 94
 95impl HistoryStore {
 96    pub fn new(
 97        context_store: Entity<assistant_context::ContextStore>,
 98        cx: &mut Context<Self>,
 99    ) -> Self {
100        let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
101
102        cx.spawn(async move |this, cx| {
103            let entries = Self::load_recently_opened_entries(cx).await;
104            this.update(cx, |this, cx| {
105                if let Some(entries) = entries.log_err() {
106                    this.recently_opened_entries = entries;
107                }
108
109                this.reload(cx);
110            })
111            .ok();
112        })
113        .detach();
114
115        Self {
116            context_store,
117            recently_opened_entries: VecDeque::default(),
118            threads: Vec::default(),
119            _subscriptions: subscriptions,
120            _save_recently_opened_entries_task: Task::ready(()),
121        }
122    }
123
124    pub fn thread_from_session_id(&self, session_id: &acp::SessionId) -> Option<&DbThreadMetadata> {
125        self.threads.iter().find(|thread| &thread.id == session_id)
126    }
127
128    pub fn delete_thread(
129        &mut self,
130        id: acp::SessionId,
131        cx: &mut Context<Self>,
132    ) -> Task<Result<()>> {
133        let database_future = ThreadsDatabase::connect(cx);
134        cx.spawn(async move |this, cx| {
135            let database = database_future.await.map_err(|err| anyhow!(err))?;
136            database.delete_thread(id.clone()).await?;
137            this.update(cx, |this, cx| this.reload(cx))
138        })
139    }
140
141    pub fn delete_text_thread(
142        &mut self,
143        path: Arc<Path>,
144        cx: &mut Context<Self>,
145    ) -> Task<Result<()>> {
146        self.context_store.update(cx, |context_store, cx| {
147            context_store.delete_local_context(path, cx)
148        })
149    }
150
151    pub fn load_text_thread(
152        &self,
153        path: Arc<Path>,
154        cx: &mut Context<Self>,
155    ) -> Task<Result<Entity<AssistantContext>>> {
156        self.context_store.update(cx, |context_store, cx| {
157            context_store.open_local_context(path, cx)
158        })
159    }
160
161    pub fn reload(&self, cx: &mut Context<Self>) {
162        let database_future = ThreadsDatabase::connect(cx);
163        cx.spawn(async move |this, cx| {
164            let threads = database_future
165                .await
166                .map_err(|err| anyhow!(err))?
167                .list_threads()
168                .await?;
169
170            this.update(cx, |this, cx| {
171                if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
172                    for thread in threads
173                        .iter()
174                        .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
175                        .rev()
176                    {
177                        this.push_recently_opened_entry(
178                            HistoryEntryId::AcpThread(thread.id.clone()),
179                            cx,
180                        )
181                    }
182                }
183                this.threads = threads;
184                cx.notify();
185            })
186        })
187        .detach_and_log_err(cx);
188    }
189
190    pub fn entries(&self, cx: &App) -> Vec<HistoryEntry> {
191        let mut history_entries = Vec::new();
192
193        #[cfg(debug_assertions)]
194        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
195            return history_entries;
196        }
197
198        history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
199        history_entries.extend(
200            self.context_store
201                .read(cx)
202                .unordered_contexts()
203                .cloned()
204                .map(HistoryEntry::TextThread),
205        );
206
207        history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
208        history_entries
209    }
210
211    pub fn is_empty(&self, cx: &App) -> bool {
212        self.threads.is_empty()
213            && self
214                .context_store
215                .read(cx)
216                .unordered_contexts()
217                .next()
218                .is_none()
219    }
220
221    pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
222        #[cfg(debug_assertions)]
223        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
224            return Vec::new();
225        }
226
227        let thread_entries = self.threads.iter().flat_map(|thread| {
228            self.recently_opened_entries
229                .iter()
230                .enumerate()
231                .flat_map(|(index, entry)| match entry {
232                    HistoryEntryId::AcpThread(id) if &thread.id == id => {
233                        Some((index, HistoryEntry::AcpThread(thread.clone())))
234                    }
235                    _ => None,
236                })
237        });
238
239        let context_entries =
240            self.context_store
241                .read(cx)
242                .unordered_contexts()
243                .flat_map(|context| {
244                    self.recently_opened_entries
245                        .iter()
246                        .enumerate()
247                        .flat_map(|(index, entry)| match entry {
248                            HistoryEntryId::TextThread(path) if &context.path == path => {
249                                Some((index, HistoryEntry::TextThread(context.clone())))
250                            }
251                            _ => None,
252                        })
253                });
254
255        thread_entries
256            .chain(context_entries)
257            // optimization to halt iteration early
258            .take(self.recently_opened_entries.len())
259            .sorted_unstable_by_key(|(index, _)| *index)
260            .map(|(_, entry)| entry)
261            .collect()
262    }
263
264    fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
265        let serialized_entries = self
266            .recently_opened_entries
267            .iter()
268            .filter_map(|entry| match entry {
269                HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
270                    SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
271                }),
272                HistoryEntryId::AcpThread(id) => {
273                    Some(SerializedRecentOpen::AcpThread(id.to_string()))
274                }
275            })
276            .collect::<Vec<_>>();
277
278        self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
279            let content = serde_json::to_string(&serialized_entries).unwrap();
280            cx.background_executor()
281                .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
282                .await;
283
284            if cfg!(any(feature = "test-support", test)) {
285                return;
286            }
287            KEY_VALUE_STORE
288                .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
289                .await
290                .log_err();
291        });
292    }
293
294    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
295        cx.background_spawn(async move {
296            if cfg!(any(feature = "test-support", test)) {
297                anyhow::bail!("history store does not persist in tests");
298            }
299            let json = KEY_VALUE_STORE
300                .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
301                .unwrap_or("[]".to_string());
302            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
303                .context("deserializing persisted agent panel navigation history")?
304                .into_iter()
305                .take(MAX_RECENTLY_OPENED_ENTRIES)
306                .flat_map(|entry| match entry {
307                    SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
308                        acp::SessionId(id.as_str().into()),
309                    )),
310                    SerializedRecentOpen::TextThread(file_name) => Some(
311                        HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
312                    ),
313                })
314                .collect();
315            Ok(entries)
316        })
317    }
318
319    pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
320        self.recently_opened_entries
321            .retain(|old_entry| old_entry != &entry);
322        self.recently_opened_entries.push_front(entry);
323        self.recently_opened_entries
324            .truncate(MAX_RECENTLY_OPENED_ENTRIES);
325        self.save_recently_opened_entries(cx);
326    }
327
328    pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
329        self.recently_opened_entries.retain(
330            |entry| !matches!(entry, HistoryEntryId::AcpThread(thread_id) if thread_id == &id),
331        );
332        self.save_recently_opened_entries(cx);
333    }
334
335    pub fn replace_recently_opened_text_thread(
336        &mut self,
337        old_path: &Path,
338        new_path: &Arc<Path>,
339        cx: &mut Context<Self>,
340    ) {
341        for entry in &mut self.recently_opened_entries {
342            match entry {
343                HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
344                    *entry = HistoryEntryId::TextThread(new_path.clone());
345                    break;
346                }
347                _ => {}
348            }
349        }
350        self.save_recently_opened_entries(cx);
351    }
352
353    pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
354        self.recently_opened_entries
355            .retain(|old_entry| old_entry != entry);
356        self.save_recently_opened_entries(cx);
357    }
358
359    pub fn recent_entries(&self, limit: usize, cx: &mut Context<Self>) -> Vec<HistoryEntry> {
360        self.entries(cx).into_iter().take(limit).collect()
361    }
362}