1use crate::{DbThreadMetadata, ThreadsDatabase};
2use acp_thread::MentionUri;
3use agent_client_protocol as acp;
4use anyhow::{Context as _, Result, anyhow};
5use assistant_context::{AssistantContext, SavedContextMetadata};
6use chrono::{DateTime, Utc};
7use db::kvp::KEY_VALUE_STORE;
8use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
9use itertools::Itertools;
10use paths::contexts_dir;
11use serde::{Deserialize, Serialize};
12use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
13use util::ResultExt as _;
14
15const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
16const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
17const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
18
19const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
20
21#[derive(Clone, Debug)]
22pub enum HistoryEntry {
23 AcpThread(DbThreadMetadata),
24 TextThread(SavedContextMetadata),
25}
26
27impl HistoryEntry {
28 pub fn updated_at(&self) -> DateTime<Utc> {
29 match self {
30 HistoryEntry::AcpThread(thread) => thread.updated_at,
31 HistoryEntry::TextThread(context) => context.mtime.to_utc(),
32 }
33 }
34
35 pub fn id(&self) -> HistoryEntryId {
36 match self {
37 HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
38 HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
39 }
40 }
41
42 pub fn mention_uri(&self) -> MentionUri {
43 match self {
44 HistoryEntry::AcpThread(thread) => MentionUri::Thread {
45 id: thread.id.clone(),
46 name: thread.title.to_string(),
47 },
48 HistoryEntry::TextThread(context) => MentionUri::TextThread {
49 path: context.path.as_ref().to_owned(),
50 name: context.title.to_string(),
51 },
52 }
53 }
54
55 pub fn title(&self) -> &SharedString {
56 match self {
57 HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
58 HistoryEntry::AcpThread(thread) => &thread.title,
59 HistoryEntry::TextThread(context) => &context.title,
60 }
61 }
62}
63
64/// Generic identifier for a history entry.
65#[derive(Clone, PartialEq, Eq, Debug, Hash)]
66pub enum HistoryEntryId {
67 AcpThread(acp::SessionId),
68 TextThread(Arc<Path>),
69}
70
71#[derive(Serialize, Deserialize, Debug)]
72enum SerializedRecentOpen {
73 AcpThread(String),
74 TextThread(String),
75}
76
77pub struct HistoryStore {
78 threads: Vec<DbThreadMetadata>,
79 context_store: Entity<assistant_context::ContextStore>,
80 recently_opened_entries: VecDeque<HistoryEntryId>,
81 _subscriptions: Vec<gpui::Subscription>,
82 _save_recently_opened_entries_task: Task<()>,
83}
84
85impl HistoryStore {
86 pub fn new(
87 context_store: Entity<assistant_context::ContextStore>,
88 cx: &mut Context<Self>,
89 ) -> Self {
90 let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
91
92 cx.spawn(async move |this, cx| {
93 let entries = Self::load_recently_opened_entries(cx).await;
94 this.update(cx, |this, cx| {
95 if let Some(entries) = entries.log_err() {
96 this.recently_opened_entries = entries;
97 }
98
99 this.reload(cx);
100 })
101 .ok();
102 })
103 .detach();
104
105 Self {
106 context_store,
107 recently_opened_entries: VecDeque::default(),
108 threads: Vec::default(),
109 _subscriptions: subscriptions,
110 _save_recently_opened_entries_task: Task::ready(()),
111 }
112 }
113
114 pub fn thread_from_session_id(&self, session_id: &acp::SessionId) -> Option<&DbThreadMetadata> {
115 self.threads.iter().find(|thread| &thread.id == session_id)
116 }
117
118 pub fn delete_thread(
119 &mut self,
120 id: acp::SessionId,
121 cx: &mut Context<Self>,
122 ) -> Task<Result<()>> {
123 let database_future = ThreadsDatabase::connect(cx);
124 cx.spawn(async move |this, cx| {
125 let database = database_future.await.map_err(|err| anyhow!(err))?;
126 database.delete_thread(id.clone()).await?;
127 this.update(cx, |this, cx| this.reload(cx))
128 })
129 }
130
131 pub fn delete_text_thread(
132 &mut self,
133 path: Arc<Path>,
134 cx: &mut Context<Self>,
135 ) -> Task<Result<()>> {
136 self.context_store.update(cx, |context_store, cx| {
137 context_store.delete_local_context(path, cx)
138 })
139 }
140
141 pub fn load_text_thread(
142 &self,
143 path: Arc<Path>,
144 cx: &mut Context<Self>,
145 ) -> Task<Result<Entity<AssistantContext>>> {
146 self.context_store.update(cx, |context_store, cx| {
147 context_store.open_local_context(path, cx)
148 })
149 }
150
151 pub fn reload(&self, cx: &mut Context<Self>) {
152 let database_future = ThreadsDatabase::connect(cx);
153 cx.spawn(async move |this, cx| {
154 let threads = database_future
155 .await
156 .map_err(|err| anyhow!(err))?
157 .list_threads()
158 .await?;
159
160 this.update(cx, |this, cx| {
161 if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
162 for thread in threads
163 .iter()
164 .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
165 .rev()
166 {
167 this.push_recently_opened_entry(
168 HistoryEntryId::AcpThread(thread.id.clone()),
169 cx,
170 )
171 }
172 }
173 this.threads = threads;
174 cx.notify();
175 })
176 })
177 .detach_and_log_err(cx);
178 }
179
180 pub fn entries(&self, cx: &App) -> Vec<HistoryEntry> {
181 let mut history_entries = Vec::new();
182
183 #[cfg(debug_assertions)]
184 if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
185 return history_entries;
186 }
187
188 history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
189 history_entries.extend(
190 self.context_store
191 .read(cx)
192 .unordered_contexts()
193 .cloned()
194 .map(HistoryEntry::TextThread),
195 );
196
197 history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
198 history_entries
199 }
200
201 pub fn is_empty(&self, cx: &App) -> bool {
202 self.threads.is_empty()
203 && self
204 .context_store
205 .read(cx)
206 .unordered_contexts()
207 .next()
208 .is_none()
209 }
210
211 pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
212 #[cfg(debug_assertions)]
213 if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
214 return Vec::new();
215 }
216
217 let thread_entries = self.threads.iter().flat_map(|thread| {
218 self.recently_opened_entries
219 .iter()
220 .enumerate()
221 .flat_map(|(index, entry)| match entry {
222 HistoryEntryId::AcpThread(id) if &thread.id == id => {
223 Some((index, HistoryEntry::AcpThread(thread.clone())))
224 }
225 _ => None,
226 })
227 });
228
229 let context_entries =
230 self.context_store
231 .read(cx)
232 .unordered_contexts()
233 .flat_map(|context| {
234 self.recently_opened_entries
235 .iter()
236 .enumerate()
237 .flat_map(|(index, entry)| match entry {
238 HistoryEntryId::TextThread(path) if &context.path == path => {
239 Some((index, HistoryEntry::TextThread(context.clone())))
240 }
241 _ => None,
242 })
243 });
244
245 thread_entries
246 .chain(context_entries)
247 // optimization to halt iteration early
248 .take(self.recently_opened_entries.len())
249 .sorted_unstable_by_key(|(index, _)| *index)
250 .map(|(_, entry)| entry)
251 .collect()
252 }
253
254 fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
255 let serialized_entries = self
256 .recently_opened_entries
257 .iter()
258 .filter_map(|entry| match entry {
259 HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
260 SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
261 }),
262 HistoryEntryId::AcpThread(id) => {
263 Some(SerializedRecentOpen::AcpThread(id.to_string()))
264 }
265 })
266 .collect::<Vec<_>>();
267
268 self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
269 let content = serde_json::to_string(&serialized_entries).unwrap();
270 cx.background_executor()
271 .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
272 .await;
273
274 if cfg!(any(feature = "test-support", test)) {
275 return;
276 }
277 KEY_VALUE_STORE
278 .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
279 .await
280 .log_err();
281 });
282 }
283
284 fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
285 cx.background_spawn(async move {
286 if cfg!(any(feature = "test-support", test)) {
287 anyhow::bail!("history store does not persist in tests");
288 }
289 let json = KEY_VALUE_STORE
290 .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
291 .unwrap_or("[]".to_string());
292 let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
293 .context("deserializing persisted agent panel navigation history")?
294 .into_iter()
295 .take(MAX_RECENTLY_OPENED_ENTRIES)
296 .flat_map(|entry| match entry {
297 SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
298 acp::SessionId(id.as_str().into()),
299 )),
300 SerializedRecentOpen::TextThread(file_name) => Some(
301 HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
302 ),
303 })
304 .collect();
305 Ok(entries)
306 })
307 }
308
309 pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
310 self.recently_opened_entries
311 .retain(|old_entry| old_entry != &entry);
312 self.recently_opened_entries.push_front(entry);
313 self.recently_opened_entries
314 .truncate(MAX_RECENTLY_OPENED_ENTRIES);
315 self.save_recently_opened_entries(cx);
316 }
317
318 pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
319 self.recently_opened_entries.retain(
320 |entry| !matches!(entry, HistoryEntryId::AcpThread(thread_id) if thread_id == &id),
321 );
322 self.save_recently_opened_entries(cx);
323 }
324
325 pub fn replace_recently_opened_text_thread(
326 &mut self,
327 old_path: &Path,
328 new_path: &Arc<Path>,
329 cx: &mut Context<Self>,
330 ) {
331 for entry in &mut self.recently_opened_entries {
332 match entry {
333 HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
334 *entry = HistoryEntryId::TextThread(new_path.clone());
335 break;
336 }
337 _ => {}
338 }
339 }
340 self.save_recently_opened_entries(cx);
341 }
342
343 pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
344 self.recently_opened_entries
345 .retain(|old_entry| old_entry != entry);
346 self.save_recently_opened_entries(cx);
347 }
348}