history_store.rs

  1use crate::{DbThreadMetadata, ThreadsDatabase};
  2use acp_thread::MentionUri;
  3use agent_client_protocol as acp;
  4use anyhow::{Context as _, Result, anyhow};
  5use assistant_context::{AssistantContext, SavedContextMetadata};
  6use chrono::{DateTime, Utc};
  7use db::kvp::KEY_VALUE_STORE;
  8use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
  9use itertools::Itertools;
 10use paths::contexts_dir;
 11use serde::{Deserialize, Serialize};
 12use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
 13use ui::ElementId;
 14use util::ResultExt as _;
 15
 16const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
 17const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
 18const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
 19
 20const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
 21
 22#[derive(Clone, Debug)]
 23pub enum HistoryEntry {
 24    AcpThread(DbThreadMetadata),
 25    TextThread(SavedContextMetadata),
 26}
 27
 28impl HistoryEntry {
 29    pub fn updated_at(&self) -> DateTime<Utc> {
 30        match self {
 31            HistoryEntry::AcpThread(thread) => thread.updated_at,
 32            HistoryEntry::TextThread(context) => context.mtime.to_utc(),
 33        }
 34    }
 35
 36    pub fn id(&self) -> HistoryEntryId {
 37        match self {
 38            HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
 39            HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
 40        }
 41    }
 42
 43    pub fn mention_uri(&self) -> MentionUri {
 44        match self {
 45            HistoryEntry::AcpThread(thread) => MentionUri::Thread {
 46                id: thread.id.clone(),
 47                name: thread.title.to_string(),
 48            },
 49            HistoryEntry::TextThread(context) => MentionUri::TextThread {
 50                path: context.path.as_ref().to_owned(),
 51                name: context.title.to_string(),
 52            },
 53        }
 54    }
 55
 56    pub fn title(&self) -> &SharedString {
 57        match self {
 58            HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
 59            HistoryEntry::AcpThread(thread) => &thread.title,
 60            HistoryEntry::TextThread(context) => &context.title,
 61        }
 62    }
 63}
 64
 65/// Generic identifier for a history entry.
 66#[derive(Clone, PartialEq, Eq, Debug, Hash)]
 67pub enum HistoryEntryId {
 68    AcpThread(acp::SessionId),
 69    TextThread(Arc<Path>),
 70}
 71
 72impl Into<ElementId> for HistoryEntryId {
 73    fn into(self) -> ElementId {
 74        match self {
 75            HistoryEntryId::AcpThread(session_id) => ElementId::Name(session_id.0.into()),
 76            HistoryEntryId::TextThread(path) => ElementId::Path(path),
 77        }
 78    }
 79}
 80
 81#[derive(Serialize, Deserialize, Debug)]
 82enum SerializedRecentOpen {
 83    AcpThread(String),
 84    TextThread(String),
 85}
 86
 87pub struct HistoryStore {
 88    threads: Vec<DbThreadMetadata>,
 89    entries: Vec<HistoryEntry>,
 90    context_store: Entity<assistant_context::ContextStore>,
 91    recently_opened_entries: VecDeque<HistoryEntryId>,
 92    _subscriptions: Vec<gpui::Subscription>,
 93    _save_recently_opened_entries_task: Task<()>,
 94}
 95
 96impl HistoryStore {
 97    pub fn new(
 98        context_store: Entity<assistant_context::ContextStore>,
 99        cx: &mut Context<Self>,
100    ) -> Self {
101        let subscriptions = vec![cx.observe(&context_store, |this, _, cx| this.update_entries(cx))];
102
103        cx.spawn(async move |this, cx| {
104            let entries = Self::load_recently_opened_entries(cx).await;
105            this.update(cx, |this, cx| {
106                if let Some(entries) = entries.log_err() {
107                    this.recently_opened_entries = entries;
108                }
109
110                this.reload(cx);
111            })
112            .ok();
113        })
114        .detach();
115
116        Self {
117            context_store,
118            recently_opened_entries: VecDeque::default(),
119            threads: Vec::default(),
120            entries: Vec::default(),
121            _subscriptions: subscriptions,
122            _save_recently_opened_entries_task: Task::ready(()),
123        }
124    }
125
126    pub fn thread_from_session_id(&self, session_id: &acp::SessionId) -> Option<&DbThreadMetadata> {
127        self.threads.iter().find(|thread| &thread.id == session_id)
128    }
129
130    pub fn delete_thread(
131        &mut self,
132        id: acp::SessionId,
133        cx: &mut Context<Self>,
134    ) -> Task<Result<()>> {
135        let database_future = ThreadsDatabase::connect(cx);
136        cx.spawn(async move |this, cx| {
137            let database = database_future.await.map_err(|err| anyhow!(err))?;
138            database.delete_thread(id.clone()).await?;
139            this.update(cx, |this, cx| this.reload(cx))
140        })
141    }
142
143    pub fn delete_text_thread(
144        &mut self,
145        path: Arc<Path>,
146        cx: &mut Context<Self>,
147    ) -> Task<Result<()>> {
148        self.context_store.update(cx, |context_store, cx| {
149            context_store.delete_local_context(path, cx)
150        })
151    }
152
153    pub fn load_text_thread(
154        &self,
155        path: Arc<Path>,
156        cx: &mut Context<Self>,
157    ) -> Task<Result<Entity<AssistantContext>>> {
158        self.context_store.update(cx, |context_store, cx| {
159            context_store.open_local_context(path, cx)
160        })
161    }
162
163    pub fn reload(&self, cx: &mut Context<Self>) {
164        let database_future = ThreadsDatabase::connect(cx);
165        cx.spawn(async move |this, cx| {
166            let threads = database_future
167                .await
168                .map_err(|err| anyhow!(err))?
169                .list_threads()
170                .await?;
171
172            this.update(cx, |this, cx| {
173                if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
174                    for thread in threads
175                        .iter()
176                        .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
177                        .rev()
178                    {
179                        this.push_recently_opened_entry(
180                            HistoryEntryId::AcpThread(thread.id.clone()),
181                            cx,
182                        )
183                    }
184                }
185                this.threads = threads;
186                this.update_entries(cx);
187            })
188        })
189        .detach_and_log_err(cx);
190    }
191
192    fn update_entries(&mut self, cx: &mut Context<Self>) {
193        #[cfg(debug_assertions)]
194        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
195            return;
196        }
197        let mut history_entries = Vec::new();
198        history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
199        history_entries.extend(
200            self.context_store
201                .read(cx)
202                .unordered_contexts()
203                .cloned()
204                .map(HistoryEntry::TextThread),
205        );
206
207        history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
208        self.entries = history_entries;
209        cx.notify()
210    }
211
212    pub fn is_empty(&self, _cx: &App) -> bool {
213        self.entries.is_empty()
214    }
215
216    pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
217        #[cfg(debug_assertions)]
218        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
219            return Vec::new();
220        }
221
222        let thread_entries = self.threads.iter().flat_map(|thread| {
223            self.recently_opened_entries
224                .iter()
225                .enumerate()
226                .flat_map(|(index, entry)| match entry {
227                    HistoryEntryId::AcpThread(id) if &thread.id == id => {
228                        Some((index, HistoryEntry::AcpThread(thread.clone())))
229                    }
230                    _ => None,
231                })
232        });
233
234        let context_entries =
235            self.context_store
236                .read(cx)
237                .unordered_contexts()
238                .flat_map(|context| {
239                    self.recently_opened_entries
240                        .iter()
241                        .enumerate()
242                        .flat_map(|(index, entry)| match entry {
243                            HistoryEntryId::TextThread(path) if &context.path == path => {
244                                Some((index, HistoryEntry::TextThread(context.clone())))
245                            }
246                            _ => None,
247                        })
248                });
249
250        thread_entries
251            .chain(context_entries)
252            // optimization to halt iteration early
253            .take(self.recently_opened_entries.len())
254            .sorted_unstable_by_key(|(index, _)| *index)
255            .map(|(_, entry)| entry)
256            .collect()
257    }
258
259    fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
260        let serialized_entries = self
261            .recently_opened_entries
262            .iter()
263            .filter_map(|entry| match entry {
264                HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
265                    SerializedRecentOpen::TextThread(file.to_string_lossy().into_owned())
266                }),
267                HistoryEntryId::AcpThread(id) => {
268                    Some(SerializedRecentOpen::AcpThread(id.to_string()))
269                }
270            })
271            .collect::<Vec<_>>();
272
273        self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
274            let content = serde_json::to_string(&serialized_entries).unwrap();
275            cx.background_executor()
276                .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
277                .await;
278
279            if cfg!(any(feature = "test-support", test)) {
280                return;
281            }
282            KEY_VALUE_STORE
283                .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
284                .await
285                .log_err();
286        });
287    }
288
289    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
290        cx.background_spawn(async move {
291            if cfg!(any(feature = "test-support", test)) {
292                anyhow::bail!("history store does not persist in tests");
293            }
294            let json = KEY_VALUE_STORE
295                .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
296                .unwrap_or("[]".to_string());
297            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
298                .context("deserializing persisted agent panel navigation history")?
299                .into_iter()
300                .take(MAX_RECENTLY_OPENED_ENTRIES)
301                .flat_map(|entry| match entry {
302                    SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
303                        acp::SessionId(id.as_str().into()),
304                    )),
305                    SerializedRecentOpen::TextThread(file_name) => Some(
306                        HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
307                    ),
308                })
309                .collect();
310            Ok(entries)
311        })
312    }
313
314    pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
315        self.recently_opened_entries
316            .retain(|old_entry| old_entry != &entry);
317        self.recently_opened_entries.push_front(entry);
318        self.recently_opened_entries
319            .truncate(MAX_RECENTLY_OPENED_ENTRIES);
320        self.save_recently_opened_entries(cx);
321    }
322
323    pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
324        self.recently_opened_entries.retain(
325            |entry| !matches!(entry, HistoryEntryId::AcpThread(thread_id) if thread_id == &id),
326        );
327        self.save_recently_opened_entries(cx);
328    }
329
330    pub fn replace_recently_opened_text_thread(
331        &mut self,
332        old_path: &Path,
333        new_path: &Arc<Path>,
334        cx: &mut Context<Self>,
335    ) {
336        for entry in &mut self.recently_opened_entries {
337            match entry {
338                HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
339                    *entry = HistoryEntryId::TextThread(new_path.clone());
340                    break;
341                }
342                _ => {}
343            }
344        }
345        self.save_recently_opened_entries(cx);
346    }
347
348    pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
349        self.recently_opened_entries
350            .retain(|old_entry| old_entry != entry);
351        self.save_recently_opened_entries(cx);
352    }
353
354    pub fn entries(&self) -> impl Iterator<Item = HistoryEntry> {
355        self.entries.iter().cloned()
356    }
357}