history_store.rs

  1use crate::{DbThreadMetadata, ThreadsDatabase};
  2use acp_thread::MentionUri;
  3use agent_client_protocol as acp;
  4use anyhow::{Context as _, Result, anyhow};
  5use assistant_context::{AssistantContext, SavedContextMetadata};
  6use chrono::{DateTime, Utc};
  7use db::kvp::KEY_VALUE_STORE;
  8use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
  9use itertools::Itertools;
 10use paths::contexts_dir;
 11use serde::{Deserialize, Serialize};
 12use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
 13use util::ResultExt as _;
 14
 15const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
 16const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
 17const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
 18
 19const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
 20
 21#[derive(Clone, Debug)]
 22pub enum HistoryEntry {
 23    AcpThread(DbThreadMetadata),
 24    TextThread(SavedContextMetadata),
 25}
 26
 27impl HistoryEntry {
 28    pub fn updated_at(&self) -> DateTime<Utc> {
 29        match self {
 30            HistoryEntry::AcpThread(thread) => thread.updated_at,
 31            HistoryEntry::TextThread(context) => context.mtime.to_utc(),
 32        }
 33    }
 34
 35    pub fn id(&self) -> HistoryEntryId {
 36        match self {
 37            HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
 38            HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
 39        }
 40    }
 41
 42    pub fn mention_uri(&self) -> MentionUri {
 43        match self {
 44            HistoryEntry::AcpThread(thread) => MentionUri::Thread {
 45                id: thread.id.clone(),
 46                name: thread.title.to_string(),
 47            },
 48            HistoryEntry::TextThread(context) => MentionUri::TextThread {
 49                path: context.path.as_ref().to_owned(),
 50                name: context.title.to_string(),
 51            },
 52        }
 53    }
 54
 55    pub fn title(&self) -> &SharedString {
 56        match self {
 57            HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
 58            HistoryEntry::AcpThread(thread) => &thread.title,
 59            HistoryEntry::TextThread(context) => &context.title,
 60        }
 61    }
 62}
 63
 64/// Generic identifier for a history entry.
 65#[derive(Clone, PartialEq, Eq, Debug, Hash)]
 66pub enum HistoryEntryId {
 67    AcpThread(acp::SessionId),
 68    TextThread(Arc<Path>),
 69}
 70
 71#[derive(Serialize, Deserialize, Debug)]
 72enum SerializedRecentOpen {
 73    AcpThread(String),
 74    TextThread(String),
 75}
 76
 77pub struct HistoryStore {
 78    threads: Vec<DbThreadMetadata>,
 79    context_store: Entity<assistant_context::ContextStore>,
 80    recently_opened_entries: VecDeque<HistoryEntryId>,
 81    _subscriptions: Vec<gpui::Subscription>,
 82    _save_recently_opened_entries_task: Task<()>,
 83}
 84
 85impl HistoryStore {
 86    pub fn new(
 87        context_store: Entity<assistant_context::ContextStore>,
 88        cx: &mut Context<Self>,
 89    ) -> Self {
 90        let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
 91
 92        cx.spawn(async move |this, cx| {
 93            let entries = Self::load_recently_opened_entries(cx).await;
 94            this.update(cx, |this, cx| {
 95                if let Some(entries) = entries.log_err() {
 96                    this.recently_opened_entries = entries;
 97                }
 98
 99                this.reload(cx);
100            })
101            .ok();
102        })
103        .detach();
104
105        Self {
106            context_store,
107            recently_opened_entries: VecDeque::default(),
108            threads: Vec::default(),
109            _subscriptions: subscriptions,
110            _save_recently_opened_entries_task: Task::ready(()),
111        }
112    }
113
114    pub fn delete_thread(
115        &mut self,
116        id: acp::SessionId,
117        cx: &mut Context<Self>,
118    ) -> Task<Result<()>> {
119        let database_future = ThreadsDatabase::connect(cx);
120        cx.spawn(async move |this, cx| {
121            let database = database_future.await.map_err(|err| anyhow!(err))?;
122            database.delete_thread(id.clone()).await?;
123            this.update(cx, |this, cx| this.reload(cx))
124        })
125    }
126
127    pub fn delete_text_thread(
128        &mut self,
129        path: Arc<Path>,
130        cx: &mut Context<Self>,
131    ) -> Task<Result<()>> {
132        self.context_store.update(cx, |context_store, cx| {
133            context_store.delete_local_context(path, cx)
134        })
135    }
136
137    pub fn load_text_thread(
138        &self,
139        path: Arc<Path>,
140        cx: &mut Context<Self>,
141    ) -> Task<Result<Entity<AssistantContext>>> {
142        self.context_store.update(cx, |context_store, cx| {
143            context_store.open_local_context(path, cx)
144        })
145    }
146
147    pub fn reload(&self, cx: &mut Context<Self>) {
148        let database_future = ThreadsDatabase::connect(cx);
149        cx.spawn(async move |this, cx| {
150            let threads = database_future
151                .await
152                .map_err(|err| anyhow!(err))?
153                .list_threads()
154                .await?;
155
156            this.update(cx, |this, cx| {
157                if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
158                    for thread in threads
159                        .iter()
160                        .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
161                        .rev()
162                    {
163                        this.push_recently_opened_entry(
164                            HistoryEntryId::AcpThread(thread.id.clone()),
165                            cx,
166                        )
167                    }
168                }
169                this.threads = threads;
170                cx.notify();
171            })
172        })
173        .detach_and_log_err(cx);
174    }
175
176    pub fn entries(&self, cx: &App) -> Vec<HistoryEntry> {
177        let mut history_entries = Vec::new();
178
179        #[cfg(debug_assertions)]
180        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
181            return history_entries;
182        }
183
184        history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
185        history_entries.extend(
186            self.context_store
187                .read(cx)
188                .unordered_contexts()
189                .cloned()
190                .map(HistoryEntry::TextThread),
191        );
192
193        history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
194        history_entries
195    }
196
197    pub fn is_empty(&self, cx: &App) -> bool {
198        self.threads.is_empty()
199            && self
200                .context_store
201                .read(cx)
202                .unordered_contexts()
203                .next()
204                .is_none()
205    }
206
207    pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
208        #[cfg(debug_assertions)]
209        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
210            return Vec::new();
211        }
212
213        let thread_entries = self.threads.iter().flat_map(|thread| {
214            self.recently_opened_entries
215                .iter()
216                .enumerate()
217                .flat_map(|(index, entry)| match entry {
218                    HistoryEntryId::AcpThread(id) if &thread.id == id => {
219                        Some((index, HistoryEntry::AcpThread(thread.clone())))
220                    }
221                    _ => None,
222                })
223        });
224
225        let context_entries =
226            self.context_store
227                .read(cx)
228                .unordered_contexts()
229                .flat_map(|context| {
230                    self.recently_opened_entries
231                        .iter()
232                        .enumerate()
233                        .flat_map(|(index, entry)| match entry {
234                            HistoryEntryId::TextThread(path) if &context.path == path => {
235                                Some((index, HistoryEntry::TextThread(context.clone())))
236                            }
237                            _ => None,
238                        })
239                });
240
241        thread_entries
242            .chain(context_entries)
243            // optimization to halt iteration early
244            .take(self.recently_opened_entries.len())
245            .sorted_unstable_by_key(|(index, _)| *index)
246            .map(|(_, entry)| entry)
247            .collect()
248    }
249
250    fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
251        let serialized_entries = self
252            .recently_opened_entries
253            .iter()
254            .filter_map(|entry| match entry {
255                HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
256                    SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
257                }),
258                HistoryEntryId::AcpThread(id) => {
259                    Some(SerializedRecentOpen::AcpThread(id.to_string()))
260                }
261            })
262            .collect::<Vec<_>>();
263
264        self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
265            let content = serde_json::to_string(&serialized_entries).unwrap();
266            cx.background_executor()
267                .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
268                .await;
269
270            if cfg!(any(feature = "test-support", test)) {
271                return;
272            }
273            KEY_VALUE_STORE
274                .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
275                .await
276                .log_err();
277        });
278    }
279
280    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
281        cx.background_spawn(async move {
282            if cfg!(any(feature = "test-support", test)) {
283                anyhow::bail!("history store does not persist in tests");
284            }
285            let json = KEY_VALUE_STORE
286                .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
287                .unwrap_or("[]".to_string());
288            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
289                .context("deserializing persisted agent panel navigation history")?
290                .into_iter()
291                .take(MAX_RECENTLY_OPENED_ENTRIES)
292                .flat_map(|entry| match entry {
293                    SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
294                        acp::SessionId(id.as_str().into()),
295                    )),
296                    SerializedRecentOpen::TextThread(file_name) => Some(
297                        HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
298                    ),
299                })
300                .collect();
301            Ok(entries)
302        })
303    }
304
305    pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
306        self.recently_opened_entries
307            .retain(|old_entry| old_entry != &entry);
308        self.recently_opened_entries.push_front(entry);
309        self.recently_opened_entries
310            .truncate(MAX_RECENTLY_OPENED_ENTRIES);
311        self.save_recently_opened_entries(cx);
312    }
313
314    pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
315        self.recently_opened_entries.retain(|entry| match entry {
316            HistoryEntryId::AcpThread(thread_id) if thread_id == &id => false,
317            _ => true,
318        });
319        self.save_recently_opened_entries(cx);
320    }
321
322    pub fn replace_recently_opened_text_thread(
323        &mut self,
324        old_path: &Path,
325        new_path: &Arc<Path>,
326        cx: &mut Context<Self>,
327    ) {
328        for entry in &mut self.recently_opened_entries {
329            match entry {
330                HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
331                    *entry = HistoryEntryId::TextThread(new_path.clone());
332                    break;
333                }
334                _ => {}
335            }
336        }
337        self.save_recently_opened_entries(cx);
338    }
339
340    pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
341        self.recently_opened_entries
342            .retain(|old_entry| old_entry != entry);
343        self.save_recently_opened_entries(cx);
344    }
345}