history_store.rs

  1use crate::{DbThreadMetadata, ThreadsDatabase};
  2use agent_client_protocol as acp;
  3use anyhow::{Context as _, Result, anyhow};
  4use assistant_context::SavedContextMetadata;
  5use chrono::{DateTime, Utc};
  6use db::kvp::KEY_VALUE_STORE;
  7use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
  8use itertools::Itertools;
  9use paths::contexts_dir;
 10use serde::{Deserialize, Serialize};
 11use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
 12use util::ResultExt as _;
 13
 14const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
 15const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
 16const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
 17
 18const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
 19
 20#[derive(Clone, Debug)]
 21pub enum HistoryEntry {
 22    AcpThread(DbThreadMetadata),
 23    TextThread(SavedContextMetadata),
 24}
 25
 26impl HistoryEntry {
 27    pub fn updated_at(&self) -> DateTime<Utc> {
 28        match self {
 29            HistoryEntry::AcpThread(thread) => thread.updated_at,
 30            HistoryEntry::TextThread(context) => context.mtime.to_utc(),
 31        }
 32    }
 33
 34    pub fn id(&self) -> HistoryEntryId {
 35        match self {
 36            HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
 37            HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
 38        }
 39    }
 40
 41    pub fn title(&self) -> &SharedString {
 42        match self {
 43            HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
 44            HistoryEntry::AcpThread(thread) => &thread.title,
 45            HistoryEntry::TextThread(context) => &context.title,
 46        }
 47    }
 48}
 49
 50/// Generic identifier for a history entry.
 51#[derive(Clone, PartialEq, Eq, Debug)]
 52pub enum HistoryEntryId {
 53    AcpThread(acp::SessionId),
 54    TextThread(Arc<Path>),
 55}
 56
 57#[derive(Serialize, Deserialize, Debug)]
 58enum SerializedRecentOpen {
 59    AcpThread(String),
 60    TextThread(String),
 61}
 62
 63pub struct HistoryStore {
 64    threads: Vec<DbThreadMetadata>,
 65    context_store: Entity<assistant_context::ContextStore>,
 66    recently_opened_entries: VecDeque<HistoryEntryId>,
 67    _subscriptions: Vec<gpui::Subscription>,
 68    _save_recently_opened_entries_task: Task<()>,
 69}
 70
 71impl HistoryStore {
 72    pub fn new(
 73        context_store: Entity<assistant_context::ContextStore>,
 74        cx: &mut Context<Self>,
 75    ) -> Self {
 76        let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
 77
 78        cx.spawn(async move |this, cx| {
 79            let entries = Self::load_recently_opened_entries(cx).await;
 80            this.update(cx, |this, cx| {
 81                if let Some(entries) = entries.log_err() {
 82                    this.recently_opened_entries = entries;
 83                }
 84
 85                this.reload(cx);
 86            })
 87            .ok();
 88        })
 89        .detach();
 90
 91        Self {
 92            context_store,
 93            recently_opened_entries: VecDeque::default(),
 94            threads: Vec::default(),
 95            _subscriptions: subscriptions,
 96            _save_recently_opened_entries_task: Task::ready(()),
 97        }
 98    }
 99
100    pub fn delete_thread(
101        &mut self,
102        id: acp::SessionId,
103        cx: &mut Context<Self>,
104    ) -> Task<Result<()>> {
105        let database_future = ThreadsDatabase::connect(cx);
106        cx.spawn(async move |this, cx| {
107            let database = database_future.await.map_err(|err| anyhow!(err))?;
108            database.delete_thread(id.clone()).await?;
109            this.update(cx, |this, cx| this.reload(cx))
110        })
111    }
112
113    pub fn delete_text_thread(
114        &mut self,
115        path: Arc<Path>,
116        cx: &mut Context<Self>,
117    ) -> Task<Result<()>> {
118        self.context_store.update(cx, |context_store, cx| {
119            context_store.delete_local_context(path, cx)
120        })
121    }
122
123    pub fn reload(&self, cx: &mut Context<Self>) {
124        let database_future = ThreadsDatabase::connect(cx);
125        cx.spawn(async move |this, cx| {
126            let threads = database_future
127                .await
128                .map_err(|err| anyhow!(err))?
129                .list_threads()
130                .await?;
131
132            this.update(cx, |this, cx| {
133                if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
134                    for thread in threads
135                        .iter()
136                        .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
137                        .rev()
138                    {
139                        this.push_recently_opened_entry(
140                            HistoryEntryId::AcpThread(thread.id.clone()),
141                            cx,
142                        )
143                    }
144                }
145                this.threads = threads;
146                cx.notify();
147            })
148        })
149        .detach_and_log_err(cx);
150    }
151
152    pub fn entries(&self, cx: &mut Context<Self>) -> Vec<HistoryEntry> {
153        let mut history_entries = Vec::new();
154
155        #[cfg(debug_assertions)]
156        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
157            return history_entries;
158        }
159
160        history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
161        history_entries.extend(
162            self.context_store
163                .read(cx)
164                .unordered_contexts()
165                .cloned()
166                .map(HistoryEntry::TextThread),
167        );
168
169        history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
170        history_entries
171    }
172
173    pub fn is_empty(&self, cx: &App) -> bool {
174        self.threads.is_empty()
175            && self
176                .context_store
177                .read(cx)
178                .unordered_contexts()
179                .next()
180                .is_none()
181    }
182
183    pub fn recent_entries(&self, limit: usize, cx: &mut Context<Self>) -> Vec<HistoryEntry> {
184        self.entries(cx).into_iter().take(limit).collect()
185    }
186
187    pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
188        #[cfg(debug_assertions)]
189        if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
190            return Vec::new();
191        }
192
193        let thread_entries = self.threads.iter().flat_map(|thread| {
194            self.recently_opened_entries
195                .iter()
196                .enumerate()
197                .flat_map(|(index, entry)| match entry {
198                    HistoryEntryId::AcpThread(id) if &thread.id == id => {
199                        Some((index, HistoryEntry::AcpThread(thread.clone())))
200                    }
201                    _ => None,
202                })
203        });
204
205        let context_entries =
206            self.context_store
207                .read(cx)
208                .unordered_contexts()
209                .flat_map(|context| {
210                    self.recently_opened_entries
211                        .iter()
212                        .enumerate()
213                        .flat_map(|(index, entry)| match entry {
214                            HistoryEntryId::TextThread(path) if &context.path == path => {
215                                Some((index, HistoryEntry::TextThread(context.clone())))
216                            }
217                            _ => None,
218                        })
219                });
220
221        thread_entries
222            .chain(context_entries)
223            // optimization to halt iteration early
224            .take(self.recently_opened_entries.len())
225            .sorted_unstable_by_key(|(index, _)| *index)
226            .map(|(_, entry)| entry)
227            .collect()
228    }
229
230    fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
231        let serialized_entries = self
232            .recently_opened_entries
233            .iter()
234            .filter_map(|entry| match entry {
235                HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
236                    SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
237                }),
238                HistoryEntryId::AcpThread(id) => {
239                    Some(SerializedRecentOpen::AcpThread(id.to_string()))
240                }
241            })
242            .collect::<Vec<_>>();
243
244        self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
245            let content = serde_json::to_string(&serialized_entries).unwrap();
246            cx.background_executor()
247                .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
248                .await;
249            KEY_VALUE_STORE
250                .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
251                .await
252                .log_err();
253        });
254    }
255
256    fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
257        cx.background_spawn(async move {
258            let json = KEY_VALUE_STORE
259                .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
260                .unwrap_or("[]".to_string());
261            let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
262                .context("deserializing persisted agent panel navigation history")?
263                .into_iter()
264                .take(MAX_RECENTLY_OPENED_ENTRIES)
265                .flat_map(|entry| match entry {
266                    SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
267                        acp::SessionId(id.as_str().into()),
268                    )),
269                    SerializedRecentOpen::TextThread(file_name) => Some(
270                        HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
271                    ),
272                })
273                .collect();
274            Ok(entries)
275        })
276    }
277
278    pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
279        self.recently_opened_entries
280            .retain(|old_entry| old_entry != &entry);
281        self.recently_opened_entries.push_front(entry);
282        self.recently_opened_entries
283            .truncate(MAX_RECENTLY_OPENED_ENTRIES);
284        self.save_recently_opened_entries(cx);
285    }
286
287    pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
288        self.recently_opened_entries.retain(|entry| match entry {
289            HistoryEntryId::AcpThread(thread_id) if thread_id == &id => false,
290            _ => true,
291        });
292        self.save_recently_opened_entries(cx);
293    }
294
295    pub fn replace_recently_opened_text_thread(
296        &mut self,
297        old_path: &Path,
298        new_path: &Arc<Path>,
299        cx: &mut Context<Self>,
300    ) {
301        for entry in &mut self.recently_opened_entries {
302            match entry {
303                HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
304                    *entry = HistoryEntryId::TextThread(new_path.clone());
305                    break;
306                }
307                _ => {}
308            }
309        }
310        self.save_recently_opened_entries(cx);
311    }
312
313    pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
314        self.recently_opened_entries
315            .retain(|old_entry| old_entry != entry);
316        self.save_recently_opened_entries(cx);
317    }
318}