1use crate::{DbThreadMetadata, ThreadsDatabase};
2use agent_client_protocol as acp;
3use anyhow::{Context as _, Result, anyhow};
4use assistant_context::SavedContextMetadata;
5use chrono::{DateTime, Utc};
6use db::kvp::KEY_VALUE_STORE;
7use gpui::{App, AsyncApp, Entity, SharedString, Task, prelude::*};
8use itertools::Itertools;
9use paths::contexts_dir;
10use serde::{Deserialize, Serialize};
11use std::{collections::VecDeque, path::Path, sync::Arc, time::Duration};
12use util::ResultExt as _;
13
14const MAX_RECENTLY_OPENED_ENTRIES: usize = 6;
15const RECENTLY_OPENED_THREADS_KEY: &str = "recent-agent-threads";
16const SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE: Duration = Duration::from_millis(50);
17
18const DEFAULT_TITLE: &SharedString = &SharedString::new_static("New Thread");
19
20#[derive(Clone, Debug)]
21pub enum HistoryEntry {
22 AcpThread(DbThreadMetadata),
23 TextThread(SavedContextMetadata),
24}
25
26impl HistoryEntry {
27 pub fn updated_at(&self) -> DateTime<Utc> {
28 match self {
29 HistoryEntry::AcpThread(thread) => thread.updated_at,
30 HistoryEntry::TextThread(context) => context.mtime.to_utc(),
31 }
32 }
33
34 pub fn id(&self) -> HistoryEntryId {
35 match self {
36 HistoryEntry::AcpThread(thread) => HistoryEntryId::AcpThread(thread.id.clone()),
37 HistoryEntry::TextThread(context) => HistoryEntryId::TextThread(context.path.clone()),
38 }
39 }
40
41 pub fn title(&self) -> &SharedString {
42 match self {
43 HistoryEntry::AcpThread(thread) if thread.title.is_empty() => DEFAULT_TITLE,
44 HistoryEntry::AcpThread(thread) => &thread.title,
45 HistoryEntry::TextThread(context) => &context.title,
46 }
47 }
48}
49
50/// Generic identifier for a history entry.
51#[derive(Clone, PartialEq, Eq, Debug)]
52pub enum HistoryEntryId {
53 AcpThread(acp::SessionId),
54 TextThread(Arc<Path>),
55}
56
57#[derive(Serialize, Deserialize, Debug)]
58enum SerializedRecentOpen {
59 AcpThread(String),
60 TextThread(String),
61}
62
63pub struct HistoryStore {
64 threads: Vec<DbThreadMetadata>,
65 context_store: Entity<assistant_context::ContextStore>,
66 recently_opened_entries: VecDeque<HistoryEntryId>,
67 _subscriptions: Vec<gpui::Subscription>,
68 _save_recently_opened_entries_task: Task<()>,
69}
70
71impl HistoryStore {
72 pub fn new(
73 context_store: Entity<assistant_context::ContextStore>,
74 cx: &mut Context<Self>,
75 ) -> Self {
76 let subscriptions = vec![cx.observe(&context_store, |_, _, cx| cx.notify())];
77
78 cx.spawn(async move |this, cx| {
79 let entries = Self::load_recently_opened_entries(cx).await;
80 this.update(cx, |this, cx| {
81 if let Some(entries) = entries.log_err() {
82 this.recently_opened_entries = entries;
83 }
84
85 this.reload(cx);
86 })
87 .ok();
88 })
89 .detach();
90
91 Self {
92 context_store,
93 recently_opened_entries: VecDeque::default(),
94 threads: Vec::default(),
95 _subscriptions: subscriptions,
96 _save_recently_opened_entries_task: Task::ready(()),
97 }
98 }
99
100 pub fn delete_thread(
101 &mut self,
102 id: acp::SessionId,
103 cx: &mut Context<Self>,
104 ) -> Task<Result<()>> {
105 let database_future = ThreadsDatabase::connect(cx);
106 cx.spawn(async move |this, cx| {
107 let database = database_future.await.map_err(|err| anyhow!(err))?;
108 database.delete_thread(id.clone()).await?;
109 this.update(cx, |this, cx| this.reload(cx))
110 })
111 }
112
113 pub fn delete_text_thread(
114 &mut self,
115 path: Arc<Path>,
116 cx: &mut Context<Self>,
117 ) -> Task<Result<()>> {
118 self.context_store.update(cx, |context_store, cx| {
119 context_store.delete_local_context(path, cx)
120 })
121 }
122
123 pub fn reload(&self, cx: &mut Context<Self>) {
124 let database_future = ThreadsDatabase::connect(cx);
125 cx.spawn(async move |this, cx| {
126 let threads = database_future
127 .await
128 .map_err(|err| anyhow!(err))?
129 .list_threads()
130 .await?;
131
132 this.update(cx, |this, cx| {
133 if this.recently_opened_entries.len() < MAX_RECENTLY_OPENED_ENTRIES {
134 for thread in threads
135 .iter()
136 .take(MAX_RECENTLY_OPENED_ENTRIES - this.recently_opened_entries.len())
137 .rev()
138 {
139 this.push_recently_opened_entry(
140 HistoryEntryId::AcpThread(thread.id.clone()),
141 cx,
142 )
143 }
144 }
145 this.threads = threads;
146 cx.notify();
147 })
148 })
149 .detach_and_log_err(cx);
150 }
151
152 pub fn entries(&self, cx: &mut Context<Self>) -> Vec<HistoryEntry> {
153 let mut history_entries = Vec::new();
154
155 #[cfg(debug_assertions)]
156 if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
157 return history_entries;
158 }
159
160 history_entries.extend(self.threads.iter().cloned().map(HistoryEntry::AcpThread));
161 history_entries.extend(
162 self.context_store
163 .read(cx)
164 .unordered_contexts()
165 .cloned()
166 .map(HistoryEntry::TextThread),
167 );
168
169 history_entries.sort_unstable_by_key(|entry| std::cmp::Reverse(entry.updated_at()));
170 history_entries
171 }
172
173 pub fn is_empty(&self, cx: &App) -> bool {
174 self.threads.is_empty()
175 && self
176 .context_store
177 .read(cx)
178 .unordered_contexts()
179 .next()
180 .is_none()
181 }
182
183 pub fn recent_entries(&self, limit: usize, cx: &mut Context<Self>) -> Vec<HistoryEntry> {
184 self.entries(cx).into_iter().take(limit).collect()
185 }
186
187 pub fn recently_opened_entries(&self, cx: &App) -> Vec<HistoryEntry> {
188 #[cfg(debug_assertions)]
189 if std::env::var("ZED_SIMULATE_NO_THREAD_HISTORY").is_ok() {
190 return Vec::new();
191 }
192
193 let thread_entries = self.threads.iter().flat_map(|thread| {
194 self.recently_opened_entries
195 .iter()
196 .enumerate()
197 .flat_map(|(index, entry)| match entry {
198 HistoryEntryId::AcpThread(id) if &thread.id == id => {
199 Some((index, HistoryEntry::AcpThread(thread.clone())))
200 }
201 _ => None,
202 })
203 });
204
205 let context_entries =
206 self.context_store
207 .read(cx)
208 .unordered_contexts()
209 .flat_map(|context| {
210 self.recently_opened_entries
211 .iter()
212 .enumerate()
213 .flat_map(|(index, entry)| match entry {
214 HistoryEntryId::TextThread(path) if &context.path == path => {
215 Some((index, HistoryEntry::TextThread(context.clone())))
216 }
217 _ => None,
218 })
219 });
220
221 thread_entries
222 .chain(context_entries)
223 // optimization to halt iteration early
224 .take(self.recently_opened_entries.len())
225 .sorted_unstable_by_key(|(index, _)| *index)
226 .map(|(_, entry)| entry)
227 .collect()
228 }
229
230 fn save_recently_opened_entries(&mut self, cx: &mut Context<Self>) {
231 let serialized_entries = self
232 .recently_opened_entries
233 .iter()
234 .filter_map(|entry| match entry {
235 HistoryEntryId::TextThread(path) => path.file_name().map(|file| {
236 SerializedRecentOpen::TextThread(file.to_string_lossy().to_string())
237 }),
238 HistoryEntryId::AcpThread(id) => {
239 Some(SerializedRecentOpen::AcpThread(id.to_string()))
240 }
241 })
242 .collect::<Vec<_>>();
243
244 self._save_recently_opened_entries_task = cx.spawn(async move |_, cx| {
245 let content = serde_json::to_string(&serialized_entries).unwrap();
246 cx.background_executor()
247 .timer(SAVE_RECENTLY_OPENED_ENTRIES_DEBOUNCE)
248 .await;
249 KEY_VALUE_STORE
250 .write_kvp(RECENTLY_OPENED_THREADS_KEY.to_owned(), content)
251 .await
252 .log_err();
253 });
254 }
255
256 fn load_recently_opened_entries(cx: &AsyncApp) -> Task<Result<VecDeque<HistoryEntryId>>> {
257 cx.background_spawn(async move {
258 let json = KEY_VALUE_STORE
259 .read_kvp(RECENTLY_OPENED_THREADS_KEY)?
260 .unwrap_or("[]".to_string());
261 let entries = serde_json::from_str::<Vec<SerializedRecentOpen>>(&json)
262 .context("deserializing persisted agent panel navigation history")?
263 .into_iter()
264 .take(MAX_RECENTLY_OPENED_ENTRIES)
265 .flat_map(|entry| match entry {
266 SerializedRecentOpen::AcpThread(id) => Some(HistoryEntryId::AcpThread(
267 acp::SessionId(id.as_str().into()),
268 )),
269 SerializedRecentOpen::TextThread(file_name) => Some(
270 HistoryEntryId::TextThread(contexts_dir().join(file_name).into()),
271 ),
272 })
273 .collect();
274 Ok(entries)
275 })
276 }
277
278 pub fn push_recently_opened_entry(&mut self, entry: HistoryEntryId, cx: &mut Context<Self>) {
279 self.recently_opened_entries
280 .retain(|old_entry| old_entry != &entry);
281 self.recently_opened_entries.push_front(entry);
282 self.recently_opened_entries
283 .truncate(MAX_RECENTLY_OPENED_ENTRIES);
284 self.save_recently_opened_entries(cx);
285 }
286
287 pub fn remove_recently_opened_thread(&mut self, id: acp::SessionId, cx: &mut Context<Self>) {
288 self.recently_opened_entries.retain(|entry| match entry {
289 HistoryEntryId::AcpThread(thread_id) if thread_id == &id => false,
290 _ => true,
291 });
292 self.save_recently_opened_entries(cx);
293 }
294
295 pub fn replace_recently_opened_text_thread(
296 &mut self,
297 old_path: &Path,
298 new_path: &Arc<Path>,
299 cx: &mut Context<Self>,
300 ) {
301 for entry in &mut self.recently_opened_entries {
302 match entry {
303 HistoryEntryId::TextThread(path) if path.as_ref() == old_path => {
304 *entry = HistoryEntryId::TextThread(new_path.clone());
305 break;
306 }
307 _ => {}
308 }
309 }
310 self.save_recently_opened_entries(cx);
311 }
312
313 pub fn remove_recently_opened_entry(&mut self, entry: &HistoryEntryId, cx: &mut Context<Self>) {
314 self.recently_opened_entries
315 .retain(|old_entry| old_entry != entry);
316 self.save_recently_opened_entries(cx);
317 }
318}