1use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest, SessionListUpdate};
2use agent_client_protocol as acp;
3use gpui::{App, Task};
4use std::rc::Rc;
5use ui::prelude::*;
6
7pub struct ThreadHistory {
8 session_list: Rc<dyn AgentSessionList>,
9 sessions: Vec<AgentSessionInfo>,
10 _refresh_task: Task<()>,
11 _watch_task: Option<Task<()>>,
12}
13
14impl ThreadHistory {
15 pub fn new(session_list: Rc<dyn AgentSessionList>, cx: &mut Context<Self>) -> Self {
16 let mut this = Self {
17 session_list,
18 sessions: Vec::new(),
19 _refresh_task: Task::ready(()),
20 _watch_task: None,
21 };
22
23 this.start_watching(cx);
24 this
25 }
26
27 #[cfg(any(test, feature = "test-support"))]
28 pub fn set_session_list(
29 &mut self,
30 session_list: Rc<dyn AgentSessionList>,
31 cx: &mut Context<Self>,
32 ) {
33 if Rc::ptr_eq(&self.session_list, &session_list) {
34 return;
35 }
36
37 self.session_list = session_list;
38 self.sessions.clear();
39 self._refresh_task = Task::ready(());
40 self.start_watching(cx);
41 }
42
43 fn start_watching(&mut self, cx: &mut Context<Self>) {
44 let Some(rx) = self.session_list.watch(cx) else {
45 self._watch_task = None;
46 self.refresh_sessions(false, cx);
47 return;
48 };
49 self.session_list.notify_refresh();
50
51 self._watch_task = Some(cx.spawn(async move |this, cx| {
52 while let Ok(first_update) = rx.recv().await {
53 let mut updates = vec![first_update];
54 while let Ok(update) = rx.try_recv() {
55 updates.push(update);
56 }
57
58 this.update(cx, |this, cx| {
59 let needs_refresh = updates
60 .iter()
61 .any(|u| matches!(u, SessionListUpdate::Refresh));
62
63 if needs_refresh {
64 this.refresh_sessions(false, cx);
65 } else {
66 for update in updates {
67 if let SessionListUpdate::SessionInfo { session_id, update } = update {
68 this.apply_info_update(session_id, update, cx);
69 }
70 }
71 }
72 })
73 .ok();
74 }
75 }));
76 }
77
78 pub(crate) fn refresh_full_history(&mut self, cx: &mut Context<Self>) {
79 self.refresh_sessions(true, cx);
80 }
81
82 fn apply_info_update(
83 &mut self,
84 session_id: acp::SessionId,
85 info_update: acp::SessionInfoUpdate,
86 cx: &mut Context<Self>,
87 ) {
88 let Some(session) = self
89 .sessions
90 .iter_mut()
91 .find(|s| s.session_id == session_id)
92 else {
93 return;
94 };
95
96 match info_update.title {
97 acp::MaybeUndefined::Value(title) => {
98 session.title = Some(title.into());
99 }
100 acp::MaybeUndefined::Null => {
101 session.title = None;
102 }
103 acp::MaybeUndefined::Undefined => {}
104 }
105 match info_update.updated_at {
106 acp::MaybeUndefined::Value(date_str) => {
107 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&date_str) {
108 session.updated_at = Some(dt.with_timezone(&chrono::Utc));
109 }
110 }
111 acp::MaybeUndefined::Null => {
112 session.updated_at = None;
113 }
114 acp::MaybeUndefined::Undefined => {}
115 }
116 if let Some(meta) = info_update.meta {
117 session.meta = Some(meta);
118 }
119
120 cx.notify();
121 }
122
123 fn refresh_sessions(&mut self, load_all_pages: bool, cx: &mut Context<Self>) {
124 let session_list = self.session_list.clone();
125
126 self._refresh_task = cx.spawn(async move |this, cx| {
127 let mut cursor: Option<String> = None;
128 let mut is_first_page = true;
129
130 loop {
131 let request = AgentSessionListRequest {
132 cursor: cursor.clone(),
133 ..Default::default()
134 };
135 let task = cx.update(|cx| session_list.list_sessions(request, cx));
136 let response = match task.await {
137 Ok(response) => response,
138 Err(error) => {
139 log::error!("Failed to load session history: {error:#}");
140 return;
141 }
142 };
143
144 let acp_thread::AgentSessionListResponse {
145 sessions: page_sessions,
146 next_cursor,
147 ..
148 } = response;
149
150 this.update(cx, |this, cx| {
151 if is_first_page {
152 this.sessions = page_sessions;
153 } else {
154 this.sessions.extend(page_sessions);
155 }
156 cx.notify();
157 })
158 .ok();
159
160 is_first_page = false;
161 if !load_all_pages {
162 break;
163 }
164
165 match next_cursor {
166 Some(next_cursor) => {
167 if cursor.as_ref() == Some(&next_cursor) {
168 log::warn!(
169 "Session list pagination returned the same cursor; stopping to avoid a loop."
170 );
171 break;
172 }
173 cursor = Some(next_cursor);
174 }
175 None => break,
176 }
177 }
178 });
179 }
180
181 pub(crate) fn is_empty(&self) -> bool {
182 self.sessions.is_empty()
183 }
184
185 pub fn refresh(&mut self, _cx: &mut Context<Self>) {
186 self.session_list.notify_refresh();
187 }
188
189 pub fn session_for_id(&self, session_id: &acp::SessionId) -> Option<AgentSessionInfo> {
190 self.sessions
191 .iter()
192 .find(|entry| &entry.session_id == session_id)
193 .cloned()
194 }
195
196 pub(crate) fn sessions(&self) -> &[AgentSessionInfo] {
197 &self.sessions
198 }
199
200 pub(crate) fn get_recent_sessions(&self, limit: usize) -> Vec<AgentSessionInfo> {
201 self.sessions.iter().take(limit).cloned().collect()
202 }
203
204 pub fn supports_delete(&self) -> bool {
205 self.session_list.supports_delete()
206 }
207
208 pub(crate) fn delete_session(
209 &self,
210 session_id: &acp::SessionId,
211 cx: &mut App,
212 ) -> Task<anyhow::Result<()>> {
213 self.session_list.delete_session(session_id, cx)
214 }
215
216 pub(crate) fn delete_sessions(&self, cx: &mut App) -> Task<anyhow::Result<()>> {
217 self.session_list.delete_sessions(cx)
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use acp_thread::AgentSessionListResponse;
225 use gpui::TestAppContext;
226 use std::{
227 any::Any,
228 sync::{Arc, Mutex},
229 };
230
231 fn init_test(cx: &mut TestAppContext) {
232 cx.update(|cx| {
233 let settings_store = settings::SettingsStore::test(cx);
234 cx.set_global(settings_store);
235 theme_settings::init(theme::LoadThemes::JustBase, cx);
236 });
237 }
238
239 #[derive(Clone)]
240 struct TestSessionList {
241 sessions: Vec<AgentSessionInfo>,
242 updates_tx: smol::channel::Sender<SessionListUpdate>,
243 updates_rx: smol::channel::Receiver<SessionListUpdate>,
244 }
245
246 impl TestSessionList {
247 fn new(sessions: Vec<AgentSessionInfo>) -> Self {
248 let (tx, rx) = smol::channel::unbounded();
249 Self {
250 sessions,
251 updates_tx: tx,
252 updates_rx: rx,
253 }
254 }
255
256 fn send_update(&self, update: SessionListUpdate) {
257 self.updates_tx.try_send(update).ok();
258 }
259 }
260
261 impl AgentSessionList for TestSessionList {
262 fn list_sessions(
263 &self,
264 _request: AgentSessionListRequest,
265 _cx: &mut App,
266 ) -> Task<anyhow::Result<AgentSessionListResponse>> {
267 Task::ready(Ok(AgentSessionListResponse::new(self.sessions.clone())))
268 }
269
270 fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
271 Some(self.updates_rx.clone())
272 }
273
274 fn notify_refresh(&self) {
275 self.send_update(SessionListUpdate::Refresh);
276 }
277
278 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
279 self
280 }
281 }
282
283 #[derive(Clone)]
284 struct PaginatedTestSessionList {
285 first_page_sessions: Vec<AgentSessionInfo>,
286 second_page_sessions: Vec<AgentSessionInfo>,
287 requested_cursors: Arc<Mutex<Vec<Option<String>>>>,
288 async_responses: bool,
289 updates_tx: smol::channel::Sender<SessionListUpdate>,
290 updates_rx: smol::channel::Receiver<SessionListUpdate>,
291 }
292
293 impl PaginatedTestSessionList {
294 fn new(
295 first_page_sessions: Vec<AgentSessionInfo>,
296 second_page_sessions: Vec<AgentSessionInfo>,
297 ) -> Self {
298 let (tx, rx) = smol::channel::unbounded();
299 Self {
300 first_page_sessions,
301 second_page_sessions,
302 requested_cursors: Arc::new(Mutex::new(Vec::new())),
303 async_responses: false,
304 updates_tx: tx,
305 updates_rx: rx,
306 }
307 }
308
309 fn with_async_responses(mut self) -> Self {
310 self.async_responses = true;
311 self
312 }
313
314 fn requested_cursors(&self) -> Vec<Option<String>> {
315 self.requested_cursors.lock().unwrap().clone()
316 }
317
318 fn clear_requested_cursors(&self) {
319 self.requested_cursors.lock().unwrap().clear()
320 }
321
322 fn send_update(&self, update: SessionListUpdate) {
323 self.updates_tx.try_send(update).ok();
324 }
325 }
326
327 impl AgentSessionList for PaginatedTestSessionList {
328 fn list_sessions(
329 &self,
330 request: AgentSessionListRequest,
331 cx: &mut App,
332 ) -> Task<anyhow::Result<AgentSessionListResponse>> {
333 let requested_cursors = self.requested_cursors.clone();
334 let first_page_sessions = self.first_page_sessions.clone();
335 let second_page_sessions = self.second_page_sessions.clone();
336
337 let respond = move || {
338 requested_cursors
339 .lock()
340 .unwrap()
341 .push(request.cursor.clone());
342
343 match request.cursor.as_deref() {
344 None => AgentSessionListResponse {
345 sessions: first_page_sessions,
346 next_cursor: Some("page-2".to_string()),
347 meta: None,
348 },
349 Some("page-2") => AgentSessionListResponse::new(second_page_sessions),
350 _ => AgentSessionListResponse::new(Vec::new()),
351 }
352 };
353
354 if self.async_responses {
355 cx.foreground_executor().spawn(async move {
356 smol::future::yield_now().await;
357 Ok(respond())
358 })
359 } else {
360 Task::ready(Ok(respond()))
361 }
362 }
363
364 fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
365 Some(self.updates_rx.clone())
366 }
367
368 fn notify_refresh(&self) {
369 self.send_update(SessionListUpdate::Refresh);
370 }
371
372 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
373 self
374 }
375 }
376
377 fn test_session(session_id: &str, title: &str) -> AgentSessionInfo {
378 AgentSessionInfo {
379 session_id: acp::SessionId::new(session_id),
380 work_dirs: None,
381 title: Some(title.to_string().into()),
382 updated_at: None,
383 created_at: None,
384 meta: None,
385 }
386 }
387
388 #[gpui::test]
389 async fn test_refresh_only_loads_first_page_by_default(cx: &mut TestAppContext) {
390 init_test(cx);
391
392 let session_list = Rc::new(PaginatedTestSessionList::new(
393 vec![test_session("session-1", "First")],
394 vec![test_session("session-2", "Second")],
395 ));
396
397 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
398 cx.run_until_parked();
399
400 history.update(cx, |history, _cx| {
401 assert_eq!(history.sessions.len(), 1);
402 assert_eq!(
403 history.sessions[0].session_id,
404 acp::SessionId::new("session-1")
405 );
406 });
407 assert_eq!(session_list.requested_cursors(), vec![None]);
408 }
409
410 #[gpui::test]
411 async fn test_enabling_full_pagination_loads_all_pages(cx: &mut TestAppContext) {
412 init_test(cx);
413
414 let session_list = Rc::new(PaginatedTestSessionList::new(
415 vec![test_session("session-1", "First")],
416 vec![test_session("session-2", "Second")],
417 ));
418
419 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
420 cx.run_until_parked();
421 session_list.clear_requested_cursors();
422
423 history.update(cx, |history, cx| history.refresh_full_history(cx));
424 cx.run_until_parked();
425
426 history.update(cx, |history, _cx| {
427 assert_eq!(history.sessions.len(), 2);
428 assert_eq!(
429 history.sessions[0].session_id,
430 acp::SessionId::new("session-1")
431 );
432 assert_eq!(
433 history.sessions[1].session_id,
434 acp::SessionId::new("session-2")
435 );
436 });
437 assert_eq!(
438 session_list.requested_cursors(),
439 vec![None, Some("page-2".to_string())]
440 );
441 }
442
443 #[gpui::test]
444 async fn test_standard_refresh_replaces_with_first_page_after_full_history_refresh(
445 cx: &mut TestAppContext,
446 ) {
447 init_test(cx);
448
449 let session_list = Rc::new(PaginatedTestSessionList::new(
450 vec![test_session("session-1", "First")],
451 vec![test_session("session-2", "Second")],
452 ));
453
454 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
455 cx.run_until_parked();
456
457 history.update(cx, |history, cx| history.refresh_full_history(cx));
458 cx.run_until_parked();
459 session_list.clear_requested_cursors();
460
461 history.update(cx, |history, cx| {
462 history.refresh(cx);
463 });
464 cx.run_until_parked();
465
466 history.update(cx, |history, _cx| {
467 assert_eq!(history.sessions.len(), 1);
468 assert_eq!(
469 history.sessions[0].session_id,
470 acp::SessionId::new("session-1")
471 );
472 });
473 assert_eq!(session_list.requested_cursors(), vec![None]);
474 }
475
476 #[gpui::test]
477 async fn test_re_entering_full_pagination_reloads_all_pages(cx: &mut TestAppContext) {
478 init_test(cx);
479
480 let session_list = Rc::new(PaginatedTestSessionList::new(
481 vec![test_session("session-1", "First")],
482 vec![test_session("session-2", "Second")],
483 ));
484
485 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
486 cx.run_until_parked();
487
488 history.update(cx, |history, cx| history.refresh_full_history(cx));
489 cx.run_until_parked();
490 session_list.clear_requested_cursors();
491
492 history.update(cx, |history, cx| history.refresh_full_history(cx));
493 cx.run_until_parked();
494
495 history.update(cx, |history, _cx| {
496 assert_eq!(history.sessions.len(), 2);
497 });
498 assert_eq!(
499 session_list.requested_cursors(),
500 vec![None, Some("page-2".to_string())]
501 );
502 }
503
504 #[gpui::test]
505 async fn test_partial_refresh_batch_drops_non_first_page_sessions(cx: &mut TestAppContext) {
506 init_test(cx);
507
508 let second_page_session_id = acp::SessionId::new("session-2");
509 let session_list = Rc::new(PaginatedTestSessionList::new(
510 vec![test_session("session-1", "First")],
511 vec![test_session("session-2", "Second")],
512 ));
513
514 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
515 cx.run_until_parked();
516
517 history.update(cx, |history, cx| history.refresh_full_history(cx));
518 cx.run_until_parked();
519
520 session_list.clear_requested_cursors();
521
522 session_list.send_update(SessionListUpdate::SessionInfo {
523 session_id: second_page_session_id.clone(),
524 update: acp::SessionInfoUpdate::new().title("Updated Second"),
525 });
526 session_list.send_update(SessionListUpdate::Refresh);
527 cx.run_until_parked();
528
529 history.update(cx, |history, _cx| {
530 assert_eq!(history.sessions.len(), 1);
531 assert_eq!(
532 history.sessions[0].session_id,
533 acp::SessionId::new("session-1")
534 );
535 assert!(
536 history
537 .sessions
538 .iter()
539 .all(|session| session.session_id != second_page_session_id)
540 );
541 });
542 assert_eq!(session_list.requested_cursors(), vec![None]);
543 }
544
545 #[gpui::test]
546 async fn test_full_pagination_works_with_async_page_fetches(cx: &mut TestAppContext) {
547 init_test(cx);
548
549 let session_list = Rc::new(
550 PaginatedTestSessionList::new(
551 vec![test_session("session-1", "First")],
552 vec![test_session("session-2", "Second")],
553 )
554 .with_async_responses(),
555 );
556
557 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
558 cx.run_until_parked();
559 session_list.clear_requested_cursors();
560
561 history.update(cx, |history, cx| history.refresh_full_history(cx));
562 cx.run_until_parked();
563
564 history.update(cx, |history, _cx| {
565 assert_eq!(history.sessions.len(), 2);
566 });
567 assert_eq!(
568 session_list.requested_cursors(),
569 vec![None, Some("page-2".to_string())]
570 );
571 }
572
573 #[gpui::test]
574 async fn test_apply_info_update_title(cx: &mut TestAppContext) {
575 init_test(cx);
576
577 let session_id = acp::SessionId::new("test-session");
578 let sessions = vec![AgentSessionInfo {
579 session_id: session_id.clone(),
580 work_dirs: None,
581 title: Some("Original Title".into()),
582 updated_at: None,
583 created_at: None,
584 meta: None,
585 }];
586 let session_list = Rc::new(TestSessionList::new(sessions));
587
588 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
589 cx.run_until_parked();
590
591 session_list.send_update(SessionListUpdate::SessionInfo {
592 session_id: session_id.clone(),
593 update: acp::SessionInfoUpdate::new().title("New Title"),
594 });
595 cx.run_until_parked();
596
597 history.update(cx, |history, _cx| {
598 let session = history.sessions.iter().find(|s| s.session_id == session_id);
599 assert_eq!(
600 session.unwrap().title.as_ref().map(|s| s.as_ref()),
601 Some("New Title")
602 );
603 });
604 }
605
606 #[gpui::test]
607 async fn test_apply_info_update_clears_title_with_null(cx: &mut TestAppContext) {
608 init_test(cx);
609
610 let session_id = acp::SessionId::new("test-session");
611 let sessions = vec![AgentSessionInfo {
612 session_id: session_id.clone(),
613 work_dirs: None,
614 title: Some("Original Title".into()),
615 updated_at: None,
616 created_at: None,
617 meta: None,
618 }];
619 let session_list = Rc::new(TestSessionList::new(sessions));
620
621 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
622 cx.run_until_parked();
623
624 session_list.send_update(SessionListUpdate::SessionInfo {
625 session_id: session_id.clone(),
626 update: acp::SessionInfoUpdate::new().title(None::<String>),
627 });
628 cx.run_until_parked();
629
630 history.update(cx, |history, _cx| {
631 let session = history.sessions.iter().find(|s| s.session_id == session_id);
632 assert_eq!(session.unwrap().title, None);
633 });
634 }
635
636 #[gpui::test]
637 async fn test_apply_info_update_ignores_undefined_fields(cx: &mut TestAppContext) {
638 init_test(cx);
639
640 let session_id = acp::SessionId::new("test-session");
641 let sessions = vec![AgentSessionInfo {
642 session_id: session_id.clone(),
643 work_dirs: None,
644 title: Some("Original Title".into()),
645 updated_at: None,
646 created_at: None,
647 meta: None,
648 }];
649 let session_list = Rc::new(TestSessionList::new(sessions));
650
651 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
652 cx.run_until_parked();
653
654 session_list.send_update(SessionListUpdate::SessionInfo {
655 session_id: session_id.clone(),
656 update: acp::SessionInfoUpdate::new(),
657 });
658 cx.run_until_parked();
659
660 history.update(cx, |history, _cx| {
661 let session = history.sessions.iter().find(|s| s.session_id == session_id);
662 assert_eq!(
663 session.unwrap().title.as_ref().map(|s| s.as_ref()),
664 Some("Original Title")
665 );
666 });
667 }
668
669 #[gpui::test]
670 async fn test_multiple_info_updates_applied_in_order(cx: &mut TestAppContext) {
671 init_test(cx);
672
673 let session_id = acp::SessionId::new("test-session");
674 let sessions = vec![AgentSessionInfo {
675 session_id: session_id.clone(),
676 work_dirs: None,
677 title: None,
678 updated_at: None,
679 created_at: None,
680 meta: None,
681 }];
682 let session_list = Rc::new(TestSessionList::new(sessions));
683
684 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
685 cx.run_until_parked();
686
687 session_list.send_update(SessionListUpdate::SessionInfo {
688 session_id: session_id.clone(),
689 update: acp::SessionInfoUpdate::new().title("First Title"),
690 });
691 session_list.send_update(SessionListUpdate::SessionInfo {
692 session_id: session_id.clone(),
693 update: acp::SessionInfoUpdate::new().title("Second Title"),
694 });
695 cx.run_until_parked();
696
697 history.update(cx, |history, _cx| {
698 let session = history.sessions.iter().find(|s| s.session_id == session_id);
699 assert_eq!(
700 session.unwrap().title.as_ref().map(|s| s.as_ref()),
701 Some("Second Title")
702 );
703 });
704 }
705
706 #[gpui::test]
707 async fn test_refresh_supersedes_info_updates(cx: &mut TestAppContext) {
708 init_test(cx);
709
710 let session_id = acp::SessionId::new("test-session");
711 let sessions = vec![AgentSessionInfo {
712 session_id: session_id.clone(),
713 work_dirs: None,
714 title: Some("Server Title".into()),
715 updated_at: None,
716 created_at: None,
717 meta: None,
718 }];
719 let session_list = Rc::new(TestSessionList::new(sessions));
720
721 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
722 cx.run_until_parked();
723
724 session_list.send_update(SessionListUpdate::SessionInfo {
725 session_id: session_id.clone(),
726 update: acp::SessionInfoUpdate::new().title("Local Update"),
727 });
728 session_list.send_update(SessionListUpdate::Refresh);
729 cx.run_until_parked();
730
731 history.update(cx, |history, _cx| {
732 let session = history.sessions.iter().find(|s| s.session_id == session_id);
733 assert_eq!(
734 session.unwrap().title.as_ref().map(|s| s.as_ref()),
735 Some("Server Title")
736 );
737 });
738 }
739
740 #[gpui::test]
741 async fn test_info_update_for_unknown_session_is_ignored(cx: &mut TestAppContext) {
742 init_test(cx);
743
744 let session_id = acp::SessionId::new("known-session");
745 let sessions = vec![AgentSessionInfo {
746 session_id,
747 work_dirs: None,
748 title: Some("Original".into()),
749 updated_at: None,
750 created_at: None,
751 meta: None,
752 }];
753 let session_list = Rc::new(TestSessionList::new(sessions));
754
755 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
756 cx.run_until_parked();
757
758 session_list.send_update(SessionListUpdate::SessionInfo {
759 session_id: acp::SessionId::new("unknown-session"),
760 update: acp::SessionInfoUpdate::new().title("Should Be Ignored"),
761 });
762 cx.run_until_parked();
763
764 history.update(cx, |history, _cx| {
765 assert_eq!(history.sessions.len(), 1);
766 assert_eq!(
767 history.sessions[0].title.as_ref().map(|s| s.as_ref()),
768 Some("Original")
769 );
770 });
771 }
772}